You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/09/21 23:01:32 UTC

[1/5] incubator-impala git commit: Remove unused MemPool::peak_allocated_bytes_

Repository: incubator-impala
Updated Branches:
  refs/heads/master f87da848f -> 537ae013e


Remove unused MemPool::peak_allocated_bytes_

The value is not used for anything but there is code devoted to updating
it and testing the value.

Testing:
Ran mem-pool-test to confirm it still works.

Change-Id: I99eba01869914c1d1e0a6ed0cab039d904fecc02
Reviewed-on: http://gerrit.cloudera.org:8080/8114
Reviewed-by: anujphadke <ap...@cloudera.com>
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Tim Armstrong <ta...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/77c0e32f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/77c0e32f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/77c0e32f

Branch: refs/heads/master
Commit: 77c0e32f33f55a39387fa4b63f0a23462c805ef8
Parents: f87da84
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Mon Sep 18 13:53:43 2017 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Thu Sep 21 20:48:00 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/mem-pool-test.cc | 21 ---------------------
 be/src/runtime/mem-pool.cc      |  2 --
 be/src/runtime/mem-pool.h       | 11 ++---------
 3 files changed, 2 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/77c0e32f/be/src/runtime/mem-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-pool-test.cc b/be/src/runtime/mem-pool-test.cc
index 4f537ab..26176a5 100644
--- a/be/src/runtime/mem-pool-test.cc
+++ b/be/src/runtime/mem-pool-test.cc
@@ -82,41 +82,21 @@ TEST(MemPoolTest, Basic) {
     // size of the next allocated chunk (64K)
     p.Allocate(65 * 1024);
     EXPECT_EQ((12 + 8 + 65) * 1024, p.total_allocated_bytes());
-    if (iter == 0) {
-      EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes());
-    } else {
-      EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
-    }
     EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
 
     // Clear() resets allocated data, but doesn't remove any chunks
     p.Clear();
     EXPECT_EQ(0, p.total_allocated_bytes());
-    if (iter == 0) {
-      EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes());
-    } else {
-      EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
-    }
     EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
 
     // next allocation reuses existing chunks
     p.Allocate(1024);
     EXPECT_EQ(1024, p.total_allocated_bytes());
-    if (iter == 0) {
-      EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes());
-    } else {
-      EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
-    }
     EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
 
     // ... unless it doesn't fit into any available chunk
     p.Allocate(120 * 1024);
     EXPECT_EQ((1 + 120) * 1024, p.total_allocated_bytes());
-    if (iter == 0) {
-      EXPECT_EQ((1 + 120) * 1024, p.peak_allocated_bytes());
-    } else {
-      EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
-    }
     EXPECT_EQ((130 + 16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
 
     // ... Try another chunk that fits into an existing chunk
@@ -127,7 +107,6 @@ TEST(MemPoolTest, Basic) {
     // we're releasing 3 chunks, which get added to p2
     p2.AcquireData(&p, false);
     EXPECT_EQ(0, p.total_allocated_bytes());
-    EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
     EXPECT_EQ(0, p.GetTotalChunkSizes());
 
     p3.AcquireData(&p2, true);  // we're keeping the 65k chunk

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/77c0e32f/be/src/runtime/mem-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-pool.cc b/be/src/runtime/mem-pool.cc
index 1b38d37..8f58168 100644
--- a/be/src/runtime/mem-pool.cc
+++ b/be/src/runtime/mem-pool.cc
@@ -43,7 +43,6 @@ MemPool::MemPool(MemTracker* mem_tracker)
   : current_chunk_idx_(-1),
     next_chunk_size_(INITIAL_CHUNK_SIZE),
     total_allocated_bytes_(0),
-    peak_allocated_bytes_(0),
     total_reserved_bytes_(0),
     mem_tracker_(mem_tracker) {
   DCHECK(mem_tracker != NULL);
@@ -217,7 +216,6 @@ void MemPool::AcquireData(MemPool* src, bool keep_current) {
     total_allocated_bytes_ += src->total_allocated_bytes_;
     src->total_allocated_bytes_ = 0;
   }
-  peak_allocated_bytes_ = std::max(total_allocated_bytes_, peak_allocated_bytes_);
 
   if (!keep_current) src->FreeAll();
   DCHECK(src->CheckIntegrity(false));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/77c0e32f/be/src/runtime/mem-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-pool.h b/be/src/runtime/mem-pool.h
index f0393b5..648e382 100644
--- a/be/src/runtime/mem-pool.h
+++ b/be/src/runtime/mem-pool.h
@@ -69,8 +69,7 @@ class MemTracker;
 /// 28K of chunks have been allocated (chunk sizes: 4K, 8K, 16K)
 /// We track total and peak allocated bytes. At this point they would be the same:
 /// 28k bytes.  A call to Clear will return the allocated memory so
-/// total_allocate_bytes_
-/// becomes 0 while peak_allocate_bytes_ remains at 28k.
+/// total_allocated_bytes_ becomes 0.
 ///     p->Clear();
 /// the entire 1st chunk is returned:
 ///     .. = p->Allocate(4 * 1024);
@@ -82,8 +81,7 @@ class MemTracker;
 ///      MemPool* p2 = new MemPool();
 /// the new mempool receives all chunks containing data from p
 ///      p2->AcquireData(p, false);
-/// At this point p.total_allocated_bytes_ would be 0 while p.peak_allocated_bytes_
-/// remains unchanged.
+/// At this point p.total_allocated_bytes_ would be 0.
 /// The one remaining (empty) chunk is released:
 ///    delete p;
 
@@ -153,7 +151,6 @@ class MemPool {
   std::string DebugString();
 
   int64_t total_allocated_bytes() const { return total_allocated_bytes_; }
-  int64_t peak_allocated_bytes() const { return peak_allocated_bytes_; }
   int64_t total_reserved_bytes() const { return total_reserved_bytes_; }
   MemTracker* mem_tracker() { return mem_tracker_; }
 
@@ -208,9 +205,6 @@ class MemPool {
   /// sum of allocated_bytes_
   int64_t total_allocated_bytes_;
 
-  /// Maximum number of bytes allocated from this pool at one time.
-  int64_t peak_allocated_bytes_;
-
   /// sum of all bytes allocated in chunks_
   int64_t total_reserved_bytes_;
 
@@ -273,7 +267,6 @@ class MemPool {
     info.allocated_bytes += size;
     total_allocated_bytes_ += size;
     DCHECK_LE(current_chunk_idx_, chunks_.size() - 1);
-    peak_allocated_bytes_ = std::max(total_allocated_bytes_, peak_allocated_bytes_);
     return result;
   }
 };


[3/5] incubator-impala git commit: IMPALA-5416: Fix an impala-shell command recursion bug

Posted by ta...@apache.org.
IMPALA-5416: Fix an impala-shell command recursion bug

Impala-shell crashes with 2 source commands on the same line and runs
a command multiple times if it shares the same line with a source
command.
The bug is caused by a misuse of cmdqueue. The cmdqueue member of
cmd.Cmd is used to execute commands not directly from user input in an
event loop. When a 'source' is run, execute_query_list() is called which
also executes the commands in cmdqueue, causing them to be executed
twice.
The fix is for execute_query_list() to not run the commands in cmdqueue.
For the non-interactive case, where the event loop won't be run, we call
execute_query_list() with cmdqueue so that the commands get run.
A test case is added to test_shell_interactive.py.

Change-Id: I453af2d4694d47e184031cb07ecd2af259ba20f3
Reviewed-on: http://gerrit.cloudera.org:8080/8063
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/bd08ed42
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/bd08ed42
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/bd08ed42

Branch: refs/heads/master
Commit: bd08ed42304b7375f5bc9c5ba6d72088cc76678a
Parents: a643854
Author: Tianyi Wang <tw...@cloudera.com>
Authored: Wed Sep 13 18:25:11 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Sep 21 21:41:31 2017 +0000

----------------------------------------------------------------------
 shell/impala_shell.py                 | 6 ++++--
 tests/shell/test_shell_interactive.py | 8 ++++++--
 2 files changed, 10 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bd08ed42/shell/impala_shell.py
----------------------------------------------------------------------
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index 6d9df4f..8db3e43 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -1208,7 +1208,7 @@ class ImpalaShell(object, cmd.Cmd):
     if not self.imp_client.connected:
       print_to_stderr('Not connected to Impala, could not execute queries.')
       return False
-    queries = [ self.sanitise_input(q) for q in self.cmdqueue + queries ]
+    queries = [self.sanitise_input(q) for q in queries]
     for q in queries:
       if self.onecmd(q) is CmdStatus.ERROR:
         print_to_stderr('Could not execute command: %s' % q)
@@ -1321,7 +1321,9 @@ def execute_queries_non_interactive_mode(options):
     return
 
   queries = parse_query_text(query_text)
-  if not ImpalaShell(options).execute_query_list(queries):
+  shell = ImpalaShell(options)
+  if not (shell.execute_query_list(shell.cmdqueue) and
+          shell.execute_query_list(queries)):
     sys.exit(1)
 
 if __name__ == "__main__":

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bd08ed42/tests/shell/test_shell_interactive.py
----------------------------------------------------------------------
diff --git a/tests/shell/test_shell_interactive.py b/tests/shell/test_shell_interactive.py
index 7b85a55..c37c2be 100755
--- a/tests/shell/test_shell_interactive.py
+++ b/tests/shell/test_shell_interactive.py
@@ -294,14 +294,18 @@ class TestImpalaShellInteractive(object):
     try:
       # Change working dir so that SOURCE command in shell.cmds can find shell2.cmds.
       os.chdir("%s/tests/shell/" % os.environ['IMPALA_HOME'])
-      result = run_impala_shell_interactive("source shell.cmds;")
+      # IMPALA-5416: Test that a command following 'source' won't be run twice.
+      result = run_impala_shell_interactive("source shell.cmds;select \"second command\";")
       assert "Query: use FUNCTIONAL" in result.stderr
       assert "Query: show TABLES" in result.stderr
       assert "alltypes" in result.stdout
-
       # This is from shell2.cmds, the result of sourcing a file from a sourced file.
       assert "select VERSION()" in result.stderr
       assert "version()" in result.stdout
+      assert len(re.findall("'second command'", result.stdout)) == 1
+      # IMPALA-5416: Test that two source commands on a line won't crash the shell.
+      result = run_impala_shell_interactive("source shell.cmds;source shell.cmds;")
+      assert len(re.findall("version\(\)", result.stdout)) == 2
     finally:
       os.chdir(cwd)
 


[2/5] incubator-impala git commit: IMPALA-5966: Fix the result file location of PlannerTest

Posted by ta...@apache.org.
IMPALA-5966: Fix the result file location of PlannerTest

The Plannertest result files should be written to
$IMPALA_FE_TEST_LOGS_DIR/PlannerTest, but instead they are written to
$IMPALA_FE_TEST_LOGS_DIR with "PlannerTest" as the prefix of their
filenames. This patch fixes this bug and refactores surrounding code a
little, replacing some Strings with Paths.

Change-Id: I20005bd0421e1bb9f3eedbb003c97d92a8faf110
Reviewed-on: http://gerrit.cloudera.org:8080/8113
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/a6438540
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/a6438540
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/a6438540

Branch: refs/heads/master
Commit: a6438540a221b5df85ac764652e65a3f699eea0b
Parents: 77c0e32
Author: Tianyi Wang <tw...@cloudera.com>
Authored: Wed Sep 20 14:39:24 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Sep 21 21:31:23 2017 +0000

----------------------------------------------------------------------
 .../apache/impala/planner/PlannerTestBase.java  | 20 +++++++++-----------
 1 file changed, 9 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a6438540/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index 6ab4b8b..ae6488d 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.fail;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -86,8 +87,9 @@ import com.google.common.collect.Sets;
 public class PlannerTestBase extends FrontendTestBase {
   private final static Logger LOG = LoggerFactory.getLogger(PlannerTest.class);
   private final static boolean GENERATE_OUTPUT_FILE = true;
-  private final String testDir_ = "functional-planner/queries/PlannerTest";
-  private static String outDir_;
+  private final java.nio.file.Path testDir_ = Paths.get("functional-planner", "queries",
+      "PlannerTest");
+  private static java.nio.file.Path outDir_;
   private static KuduClient kuduClient_;
 
   // Map from plan ID (TPlanNodeId) to the plan node with that ID.
@@ -110,11 +112,8 @@ public class PlannerTestBase extends FrontendTestBase {
       kuduClient_ = new KuduClient.KuduClientBuilder("127.0.0.1:7051").build();
     }
     String logDir = System.getenv("IMPALA_FE_TEST_LOGS_DIR");
-    if (logDir != null) {
-      outDir_ = logDir + "/PlannerTest";
-    } else {
-      outDir_ = "/tmp/PlannerTest";
-    }
+    if (logDir == null) logDir = "/tmp";
+    outDir_ = Paths.get(logDir, "PlannerTest");
   }
 
   @Before
@@ -736,7 +735,7 @@ public class PlannerTestBase extends FrontendTestBase {
 
   private void runPlannerTestFile(String testFile, String dbName, TQueryOptions options,
       boolean ignoreExplainHeader) {
-    String fileName = testDir_ + "/" + testFile + ".test";
+    String fileName = testDir_.resolve(testFile + ".test").toString();
     TestFileParser queryFileParser = new TestFileParser(fileName);
     StringBuilder actualOutput = new StringBuilder();
 
@@ -758,9 +757,8 @@ public class PlannerTestBase extends FrontendTestBase {
     // Create the actual output file
     if (GENERATE_OUTPUT_FILE) {
       try {
-        File outDirFile = new File(outDir_);
-        outDirFile.mkdirs();
-        FileWriter fw = new FileWriter(outDir_ + testFile + ".test");
+        outDir_.toFile().mkdirs();
+        FileWriter fw = new FileWriter(outDir_.resolve(testFile + ".test").toFile());
         fw.write(actualOutput.toString());
         fw.close();
       } catch (IOException e) {


[4/5] incubator-impala git commit: IMPALA-5250: Unify decompressor output_length semantics

Posted by ta...@apache.org.
IMPALA-5250: Unify decompressor output_length semantics

This patch makes the semantics of the output_length parameter in
Codec::ProcessBlock to be the same across all codecs. In existing code
different decompressor treats output_length differently:
1. SnappyDecompressor needs output_length to be greater than or equal
   to the actual decompressed length, but it does not set it to the
   actual decompressed length after decompression.
2. SnappyBlockDecompressor and Lz4Decompressor require output_length to
   be exactly the same as the actual decompressed length, otherwise
   decompression fails.
3. Other decompressors need output_length to be greater than or equal to
   the actual decompressed length and will set it to actual decompressed
   length if oversized.
This inconsistency leads to a bug where the error message is
undeterministic when the compressed block is corrupted. This patch makes
all decompressor behave like a modified version of 3:
Output_length should be greater than or equal to the actual decompressed
length and it will be set to actual decompressed length if oversized. A
decompression failure sets it to 0.
Lz4Decompressor will use the "safe" instead of the "fast" decompression
function, for the latter is insecure with corrupted data and requires
the decompressed length to be known.

Testing: A testcase is added checking that the decompressors can handle
an oversized output buffer correctly. A regression test for the exact
case described in IMPALA-5250 is also added. A benchmark is run on a
16-node cluster testing the performance impact of the LZ4Decompressor
change and no performance regression is found.

Change-Id: Ifd42942b169921a7eb53940c3762bc45bb82a993
Reviewed-on: http://gerrit.cloudera.org:8080/8030
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b1752268
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b1752268
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b1752268

Branch: refs/heads/master
Commit: b175226869853e219165a8fe3f7ab67ba7187caf
Parents: bd08ed4
Author: Tianyi Wang <tw...@cloudera.com>
Authored: Fri Sep 8 20:11:35 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Sep 21 22:25:09 2017 +0000

----------------------------------------------------------------------
 be/src/util/codec.h            |  4 +--
 be/src/util/decompress-test.cc | 43 ++++++++++++++++------
 be/src/util/decompress.cc      | 72 ++++++++++++++++++++-----------------
 3 files changed, 74 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b1752268/be/src/util/codec.h
----------------------------------------------------------------------
diff --git a/be/src/util/codec.h b/be/src/util/codec.h
index b150b3c..d21e399 100644
--- a/be/src/util/codec.h
+++ b/be/src/util/codec.h
@@ -104,8 +104,8 @@ class Codec {
   /// transformed output). If output_preallocated is false, *output will be allocated from
   /// the codec's mempool. In this case, a mempool must have been passed into the c'tor.
   //
-  /// In either case, *output_length will be set to the actual length of the transformed
-  /// output.
+  /// If the transformation succeeds, *output_length will be set to the actual length of
+  /// the transformed output. Otherwise it will be set to 0.
   //
   /// Inputs:
   ///   input_length: length of the data to process

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b1752268/be/src/util/decompress-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/decompress-test.cc b/be/src/util/decompress-test.cc
index 1e8760d..d170501 100644
--- a/be/src/util/decompress-test.cc
+++ b/be/src/util/decompress-test.cc
@@ -70,8 +70,6 @@ class DecompressorTest : public ::testing::Test {
           sizeof(input_), input_);
       CompressAndDecompressNoOutputAllocated(compressor.get(), decompressor.get(),
           0, NULL);
-      DecompressInsufficientOutputBuffer(compressor.get(), decompressor.get(),
-          sizeof(input_), input_);
     } else {
       CompressAndDecompress(compressor.get(), decompressor.get(), sizeof(input_), input_);
       // Test with odd-length input (to test the calculation of block-sizes in
@@ -88,10 +86,9 @@ class DecompressorTest : public ::testing::Test {
         // bzip does not allow NULL input
         CompressAndDecompress(compressor.get(), decompressor.get(), 0, input_);
       }
-      DecompressInsufficientOutputBuffer(compressor.get(), decompressor.get(),
-          sizeof(input_), input_);
     }
-
+    DecompressOverUnderSizedOutputBuffer(compressor.get(), decompressor.get(),
+        sizeof(input_), input_);
     compressor->Close();
     decompressor->Close();
   }
@@ -151,10 +148,11 @@ class DecompressorTest : public ::testing::Test {
     EXPECT_EQ(memcmp(input, output, input_len), 0);
   }
 
-  // Test the behavior when the decompressor is given too little space to produce
-  // the decompressed output. Verify that the decompressor returns an error and
-  // does not overflow the provided buffer.
-  void DecompressInsufficientOutputBuffer(Codec* compressor, Codec* decompressor,
+  // Test the behavior when the decompressor is given too little / too much space.
+  // Verify that the decompressor returns an error when the space is not enough, gives
+  // the correct output size when the space is enough, and does not write beyond the
+  // output size it claims.
+  void DecompressOverUnderSizedOutputBuffer(Codec* compressor, Codec* decompressor,
       int64_t input_len, uint8_t* input) {
     uint8_t* compressed;
     int64_t compressed_length;
@@ -180,9 +178,18 @@ class DecompressorTest : public ::testing::Test {
     u_int32_t *canary = (u_int32_t *) &output[output_len];
     *canary = 0x66aa77bb;
     Status status = decompressor->ProcessBlock(true, compressed_length, compressed,
-                                               &output_len, &output);
+        &output_len, &output);
     EXPECT_EQ(*canary, 0x66aa77bb);
     EXPECT_FALSE(status.ok());
+    EXPECT_EQ(output_len, 0);
+
+    // Check that the output length is the same as input when the decompressor is provided
+    // with abundant space.
+    output_len = input_len * 2;
+    output = mem_pool_.Allocate(output_len);
+    EXPECT_TRUE(decompressor->ProcessBlock(true, compressed_length, compressed,
+        &output_len, &output).ok());
+    EXPECT_EQ(output_len, input_len);
   }
 
   void Compress(Codec* compressor, int64_t input_len, uint8_t* input,
@@ -418,6 +425,22 @@ TEST_F(DecompressorTest, Impala1506) {
   pool.FreeAll();
 }
 
+TEST_F(DecompressorTest, Impala5250) {
+  // Regression test for IMPALA-5250. It tests that SnappyDecompressor handles an input
+  // buffer with a zero byte correctly. It should set the output_length to 0.
+  MemTracker trax;
+  MemPool pool(&trax);
+  scoped_ptr<Codec> decompressor;
+  EXPECT_OK(Codec::CreateDecompressor(&pool, true, impala::THdfsCompression::SNAPPY,
+      &decompressor));
+  uint8_t buf[1]{0};
+  uint8_t out_buf[1];
+  int64_t output_length = 1;
+  uint8_t* output = out_buf;
+  EXPECT_OK(decompressor->ProcessBlock(true, 1, buf, &output_length, &output));
+  EXPECT_EQ(output_length, 0);
+}
+
 }
 
 int main(int argc, char **argv) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b1752268/be/src/util/decompress.cc
----------------------------------------------------------------------
diff --git a/be/src/util/decompress.cc b/be/src/util/decompress.cc
index 2488586..f3466b9 100644
--- a/be/src/util/decompress.cc
+++ b/be/src/util/decompress.cc
@@ -139,7 +139,9 @@ Status GzipDecompressor::ProcessBlockStreaming(int64_t input_length, const uint8
 
 Status GzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_length,
     const uint8_t* input, int64_t* output_length, uint8_t** output) {
-  if (UNLIKELY(output_preallocated && *output_length == 0)) {
+  int64_t output_length_local = *output_length;
+  *output_length = 0;
+  if (UNLIKELY(output_preallocated && output_length_local == 0)) {
     // The zlib library does not allow *output to be nullptr, even when output_length is 0
     // (inflate() will return Z_STREAM_ERROR). We don't consider this an error, so bail
     // early if no output is expected. Note that we don't signal an error if the input
@@ -162,7 +164,7 @@ Status GzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
     }
     use_temp = true;
     *output = out_buffer_;
-    *output_length = buffer_length_;
+    output_length_local = buffer_length_;
   }
 
   // Reset the stream for this block
@@ -182,7 +184,7 @@ Status GzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
     stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
     stream_.avail_in = input_length;
     stream_.next_out = reinterpret_cast<Bytef*>(*output);
-    stream_.avail_out = *output_length;
+    stream_.avail_out = output_length_local;
 
     if (use_temp) {
       // We don't know the output size, so this might fail.
@@ -198,7 +200,7 @@ Status GzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
     if (!use_temp) {
       stringstream ss;
       ss << "Too small a buffer passed to GzipDecompressor. InputLength="
-        << input_length << " OutputLength=" << *output_length;
+        << input_length << " OutputLength=" << output_length_local;
       return Status(ss.str());
     }
 
@@ -213,7 +215,7 @@ Status GzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
           nullptr, details, buffer_length_);
     }
     *output = out_buffer_;
-    *output_length = buffer_length_;
+    output_length_local = buffer_length_;
     ret = inflateReset(&stream_);
   }
 
@@ -228,7 +230,7 @@ Status GzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_le
 
   // stream_.avail_out is the number of bytes *left* in the out buffer, but
   // we're interested in the number of bytes used.
-  *output_length = *output_length - stream_.avail_out;
+  *output_length = output_length_local - stream_.avail_out;
   if (use_temp) memory_pool_->AcquireData(temp_memory_pool_.get(), reuse_buffer_);
   return Status::OK();
 }
@@ -257,14 +259,16 @@ int64_t BzipDecompressor::MaxOutputLen(int64_t input_len, const uint8_t* input)
 
 Status BzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_length,
     const uint8_t* input, int64_t* output_length, uint8_t** output) {
-  if (UNLIKELY(output_preallocated && *output_length == 0)) {
+  int64_t output_length_local = *output_length;
+  *output_length = 0;
+  if (UNLIKELY(output_preallocated && output_length_local == 0)) {
     // Same problem as zlib library, see comment in GzipDecompressor::ProcessBlock().
     return Status::OK();
   }
 
   bool use_temp = false;
   if (output_preallocated) {
-    buffer_length_ = *output_length;
+    buffer_length_ = output_length_local;
     out_buffer_ = *output;
   } else if (!reuse_buffer_ || out_buffer_ == nullptr) {
     // guess that we will need 2x the input length.
@@ -423,11 +427,14 @@ int64_t SnappyBlockDecompressor::MaxOutputLen(int64_t input_len, const uint8_t*
 // Utility function to decompress snappy block compressed data.  If size_only is true,
 // this function does not decompress but only computes the output size and writes
 // the result to *output_len.
-// If size_only is false, output must be preallocated to output_len and this needs to
-// be exactly big enough to hold the decompressed output.
-// size_only is a O(1) operations (just reads a single varint for each snappy block).
+// If size_only is false, output buffer size must be at least *output_len. *output_len is
+// updated with the actual output size if the decompression succeeds, and is set to 0
+// otherwise.
+// size_only is an O(1) operation (just reads a single varint for each snappy block).
 static Status SnappyBlockDecompress(int64_t input_len, const uint8_t* input,
     bool size_only, int64_t* output_len, char* output) {
+  int64_t buffer_size = *output_len;
+  *output_len = 0;
   int64_t uncompressed_total_len = 0;
   while (input_len > 0) {
     uint32_t uncompressed_block_len = ReadWriteUtil::GetInt<uint32_t>(input);
@@ -435,7 +442,7 @@ static Status SnappyBlockDecompress(int64_t input_len, const uint8_t* input,
     input_len -= sizeof(uint32_t);
 
     if (!size_only) {
-      int64_t remaining_output_size = *output_len - uncompressed_total_len;
+      int64_t remaining_output_size = buffer_size - uncompressed_total_len;
       if (remaining_output_size < uncompressed_block_len) {
         return Status(TErrorCode::SNAPPY_DECOMPRESS_DECOMPRESS_SIZE_INCORRECT);
       }
@@ -448,7 +455,6 @@ static Status SnappyBlockDecompress(int64_t input_len, const uint8_t* input,
       input_len -= sizeof(uint32_t);
 
       if (compressed_len == 0 || compressed_len > input_len) {
-        *output_len = 0;
         return Status(TErrorCode::SNAPPY_DECOMPRESS_INVALID_COMPRESSED_LENGTH);
       }
 
@@ -456,14 +462,13 @@ static Status SnappyBlockDecompress(int64_t input_len, const uint8_t* input,
       size_t uncompressed_len;
       if (!snappy::GetUncompressedLength(reinterpret_cast<const char*>(input),
               compressed_len, &uncompressed_len)) {
-        *output_len = 0;
         return Status(TErrorCode::SNAPPY_DECOMPRESS_UNCOMPRESSED_LENGTH_FAILED);
       }
       DCHECK_GT(uncompressed_len, 0);
 
       if (!size_only) {
         // Check output bounds
-        int64_t remaining_output_size = *output_len - uncompressed_total_len;
+        int64_t remaining_output_size = buffer_size - uncompressed_total_len;
         if (remaining_output_size < uncompressed_len) {
           return Status(TErrorCode::SNAPPY_DECOMPRESS_DECOMPRESS_SIZE_INCORRECT);
         }
@@ -481,23 +486,21 @@ static Status SnappyBlockDecompress(int64_t input_len, const uint8_t* input,
       uncompressed_total_len += uncompressed_len;
     }
   }
-
-  if (size_only) {
-    *output_len = uncompressed_total_len;
-  } else if (*output_len != uncompressed_total_len) {
-    return Status(TErrorCode::SNAPPY_DECOMPRESS_DECOMPRESS_SIZE_INCORRECT);
-  }
+  *output_len = uncompressed_total_len;
   return Status::OK();
 }
 
 Status SnappyBlockDecompressor::ProcessBlock(bool output_preallocated, int64_t input_len,
     const uint8_t* input, int64_t* output_len, uint8_t** output) {
+  int64_t output_length_local = *output_len;
+  *output_len = 0;
   if (!output_preallocated) {
     // If we don't know the size beforehand, compute it.
-    RETURN_IF_ERROR(SnappyBlockDecompress(input_len, input, true, output_len, nullptr));
-    if (!reuse_buffer_ || out_buffer_ == nullptr || buffer_length_ < *output_len) {
+    RETURN_IF_ERROR(SnappyBlockDecompress(input_len, input, true, &output_length_local,
+        nullptr));
+    if (!reuse_buffer_ || out_buffer_ == nullptr || buffer_length_ < output_length_local) {
       // Need to allocate a new buffer
-      buffer_length_ = *output_len;
+      buffer_length_ = output_length_local;
       out_buffer_ = memory_pool_->TryAllocate(buffer_length_);
       if (UNLIKELY(out_buffer_ == nullptr)) {
         string details = Substitute(DECOMPRESSOR_MEM_LIMIT_EXCEEDED, "SnappyBlock",
@@ -510,7 +513,9 @@ Status SnappyBlockDecompressor::ProcessBlock(bool output_preallocated, int64_t i
   }
 
   char* out_ptr = reinterpret_cast<char*>(*output);
-  RETURN_IF_ERROR(SnappyBlockDecompress(input_len, input, false, output_len, out_ptr));
+  RETURN_IF_ERROR(SnappyBlockDecompress(input_len, input, false, &output_length_local,
+      out_ptr));
+  *output_len = output_length_local;
   return Status::OK();
 }
 
@@ -530,6 +535,8 @@ int64_t SnappyDecompressor::MaxOutputLen(int64_t input_len, const uint8_t* input
 
 Status SnappyDecompressor::ProcessBlock(bool output_preallocated, int64_t input_length,
     const uint8_t* input, int64_t* output_length, uint8_t** output) {
+  int64_t output_length_local = *output_length;
+  *output_length = 0;
   int64_t uncompressed_length = MaxOutputLen(input_length, input);
   if (uncompressed_length < 0) {
     return Status(TErrorCode::SNAPPY_DECOMPRESS_UNCOMPRESSED_LENGTH_FAILED);
@@ -548,20 +555,18 @@ Status SnappyDecompressor::ProcessBlock(bool output_preallocated, int64_t input_
       }
     }
     *output = out_buffer_;
-    *output_length = uncompressed_length;
   } else {
     // If the preallocated buffer is too small (e.g. if the file metadata is corrupt),
     // bail out early. Otherwise, this could result in a buffer overrun.
-    if (uncompressed_length > *output_length) {
+    if (uncompressed_length > output_length_local) {
       return Status(TErrorCode::SNAPPY_DECOMPRESS_DECOMPRESS_SIZE_INCORRECT);
     }
   }
-
   if (!snappy::RawUncompress(reinterpret_cast<const char*>(input),
           static_cast<size_t>(input_length), reinterpret_cast<char*>(*output))) {
     return Status("Snappy: RawUncompress failed");
   }
-
+  *output_length = uncompressed_length;
   return Status::OK();
 }
 
@@ -577,12 +582,13 @@ int64_t Lz4Decompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) {
 Status Lz4Decompressor::ProcessBlock(bool output_preallocated, int64_t input_length,
     const uint8_t* input, int64_t* output_length, uint8_t** output) {
   DCHECK(output_preallocated) << "Lz4 Codec implementation must have allocated output";
-  // LZ4_decompress_fast will cause a segmentation fault if passed a nullptr output.
   if(*output_length == 0) return Status::OK();
-  if (LZ4_decompress_fast(reinterpret_cast<const char*>(input),
-          reinterpret_cast<char*>(*output), *output_length) != input_length) {
+  int ret = LZ4_decompress_safe(reinterpret_cast<const char*>(input),
+      reinterpret_cast<char*>(*output), input_length, *output_length);
+  if (ret < 0) {
+    *output_length = 0;
     return Status("Lz4: uncompress failed");
   }
-
+  *output_length = ret;
   return Status::OK();
 }


[5/5] incubator-impala git commit: fix query report generator typo

Posted by ta...@apache.org.
fix query report generator typo

Change-Id: I8585c5f85781ff0b80292823256fe8866d3f95bd
Reviewed-on: http://gerrit.cloudera.org:8080/8118
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Michael Brown <mi...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/537ae013
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/537ae013
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/537ae013

Branch: refs/heads/master
Commit: 537ae013eef8022532fba5b193d5c9d0b1892dba
Parents: b175226
Author: Michael Brown <mi...@cloudera.com>
Authored: Thu Sep 21 08:52:29 2017 -0700
Committer: Michael Brown <mi...@cloudera.com>
Committed: Thu Sep 21 22:57:26 2017 +0000

----------------------------------------------------------------------
 tests/comparison/leopard/templates/index.template | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/537ae013/tests/comparison/leopard/templates/index.template
----------------------------------------------------------------------
diff --git a/tests/comparison/leopard/templates/index.template b/tests/comparison/leopard/templates/index.template
index 9cb1c17..2e37050 100644
--- a/tests/comparison/leopard/templates/index.template
+++ b/tests/comparison/leopard/templates/index.template
@@ -63,7 +63,7 @@ under the License.
        </table>
 
        {% if schedule_items %}
-       <h3>Run Schdule</h3>
+       <h3>Run Schedule</h3>
        <table class="table table-hover">
          <tr>
            <th>Run Name</th>