You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/27 05:01:16 UTC

[1/5] impala git commit: IMPALA-7218: [DOCS] Fixed an example

Repository: impala
Updated Branches:
  refs/heads/master cec33fa0a -> 170956b85


IMPALA-7218: [DOCS] Fixed an example

Change-Id: I3c455c0309113d1122e24c73882d85ef282fd62e
Reviewed-on: http://gerrit.cloudera.org:8080/11049
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Pooja Nilangekar <po...@cloudera.com>
Reviewed-by: Fredy Wijaya <fw...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/b0da2e0c
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/b0da2e0c
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/b0da2e0c

Branch: refs/heads/master
Commit: b0da2e0c1d8ea8b5c1f806b268b725c585421c42
Parents: cec33fa
Author: Alex Rodoni <ar...@cloudera.com>
Authored: Wed Jul 25 12:37:06 2018 -0700
Committer: Alex Rodoni <ar...@cloudera.com>
Committed: Wed Jul 25 21:50:34 2018 +0000

----------------------------------------------------------------------
 docs/topics/impala_alter_view.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/b0da2e0c/docs/topics/impala_alter_view.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_alter_view.xml b/docs/topics/impala_alter_view.xml
index 08d26bb..09558a2 100644
--- a/docs/topics/impala_alter_view.xml
+++ b/docs/topics/impala_alter_view.xml
@@ -79,7 +79,7 @@ ALTER VIEW [<varname>database_name</varname>.]<varname>view_name</varname>
           For example:
 <codeblock>
 ALTER VIEW v1 AS SELECT x, UPPER(s) s FROM t2;
-ALTER VIEW v1 (c1, c2, c3) AS SELECT x, UPPER(s) s FROM t2;
+ALTER VIEW v1 (c1, c2) AS SELECT x, UPPER(s) s FROM t2;
 ALTER VIEW v7 (c1 COMMENT 'Comment for c1', c2) AS SELECT t1.c1, t1.c2 FROM t1;
 </codeblock>
         </p>


[2/5] impala git commit: IMPALA-7340. Only send necessary fields in THdfsPartition

Posted by ta...@apache.org.
IMPALA-7340. Only send necessary fields in THdfsPartition

The THdfsPartition Thrift struct is used both for partition metadata
(sent from catalogd to coordinator) and for descriptors (sent from
coordinator frontend to backends). In the case of the descriptor,
not all fields are actually used.

This patch cleans up the Thrift struct definition to be more clear
which fields are used where, and changes the serialization code to
only fill in the necessary fields.

Change-Id: I97e8402efdfdeea06463bb71a40ebb6abd1f11f0
Reviewed-on: http://gerrit.cloudera.org:8080/11026
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/04ed7ac1
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/04ed7ac1
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/04ed7ac1

Branch: refs/heads/master
Commit: 04ed7ac132d37bff1c648a807059705ae5fac220
Parents: b0da2e0
Author: Todd Lipcon <to...@cloudera.com>
Authored: Wed Jul 18 16:53:04 2018 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Thu Jul 26 00:14:26 2018 +0000

----------------------------------------------------------------------
 common/thrift/CatalogObjects.thrift             | 51 +++++++++++++++-----
 .../apache/impala/catalog/CatalogObject.java    | 12 +++++
 .../apache/impala/catalog/FeCatalogUtils.java   | 28 ++++++-----
 .../org/apache/impala/catalog/HdfsTable.java    | 36 ++++++++------
 .../impala/catalog/local/LocalFsTable.java      |  6 +--
 5 files changed, 90 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/04ed7ac1/common/thrift/CatalogObjects.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift
index d463300..b73c5e6 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -241,18 +241,38 @@ struct THdfsPartitionLocation {
 // Represents an HDFS partition
 // TODO(vercegovac): rename to TFsPartition
 struct THdfsPartition {
+
+  // ============================================================
+  // Fields included in the "Descriptor" format sent to the backend
+  // as part of query plans and fragments.
+  // ============================================================
+
   1: required byte lineDelim
   2: required byte fieldDelim
   3: required byte collectionDelim
   4: required byte mapKeyDelim
   5: required byte escapeChar
   6: required THdfsFileFormat fileFormat
+
   // These are Literal expressions
   7: list<Exprs.TExpr> partitionKeyExprs
   8: required i32 blockSize
-  9: optional list<THdfsFileDesc> file_desc
+
   10: optional THdfsPartitionLocation location
 
+  // Unique (in this table) id of this partition. May be set to
+  // PROTOTYPE_PARTITION_ID when this object is used to describe
+  // a partition which will be created as part of a query.
+  14: optional i64 id
+
+
+  // ============================================================
+  // Fields only included when the catalogd serializes a table to be
+  // sent to the impalad as part of a catalog update.
+  // ============================================================
+
+  9: optional list<THdfsFileDesc> file_desc
+
   // The access level Impala has on this partition (READ_WRITE, READ_ONLY, etc).
   11: optional TAccessLevel access_level
 
@@ -263,10 +283,6 @@ struct THdfsPartition {
   // underlying data is cached).
   13: optional bool is_marked_cached
 
-  // Unique (in this table) id of this partition. If -1, the partition does not currently
-  // exist.
-  14: optional i64 id
-
   // (key,value) pairs stored in the Hive Metastore.
   15: optional map<string, string> hms_parameters
 
@@ -282,12 +298,17 @@ struct THdfsPartition {
   18: optional bool has_incremental_stats
 }
 
-// Constant partition ID used for THdfsPartition.prototype_partition above.
+// Constant partition ID used for THdfsPartition.prototype_partition below.
 // Must be < 0 to avoid collisions
 const i64 PROTOTYPE_PARTITION_ID = -1;
 
 
 struct THdfsTable {
+  // ============================================================
+  // Fields included in the "Descriptor" format sent to the backend
+  // as part of query plans and fragments.
+  // ============================================================
+
   1: required string hdfsBaseDir
 
   // Deprecated. Use TTableDescriptor.colNames.
@@ -303,22 +324,28 @@ struct THdfsTable {
   6: optional string avroSchema
 
   // Map from partition id to partition metadata.
-  // Does not include the special prototype partition -1 (see below).
+  // Does not include the special prototype partition with id=PROTOTYPE_PARTITION_ID --
+  // that partition is separately included below.
   4: required map<i64, THdfsPartition> partitions
 
   // Prototype partition, used when creating new partitions during insert.
   10: required THdfsPartition prototype_partition
 
-  // Each TNetworkAddress is a datanode which contains blocks of a file in the table.
-  // Used so that each THdfsFileBlock can just reference an index in this list rather
-  // than duplicate the list of network address, which helps reduce memory usage.
-  7: optional list<Types.TNetworkAddress> network_addresses
-
   // REMOVED: 8: optional bool multiple_filesystems
 
   // The prefixes of locations of partitions in this table. See THdfsPartitionLocation for
   // the description of how a prefix is computed.
   9: optional list<string> partition_prefixes
+
+  // ============================================================
+  // Fields only included when the catalogd serializes a table to be
+  // sent to the impalad as part of a catalog update.
+  // ============================================================
+
+  // Each TNetworkAddress is a datanode which contains blocks of a file in the table.
+  // Used so that each THdfsFileBlock can just reference an index in this list rather
+  // than duplicate the list of network address, which helps reduce memory usage.
+  7: optional list<Types.TNetworkAddress> network_addresses
 }
 
 struct THBaseTable {

http://git-wip-us.apache.org/repos/asf/impala/blob/04ed7ac1/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java b/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java
index 34310c3..78b8368 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogObject.java
@@ -23,6 +23,18 @@ import org.apache.impala.thrift.TCatalogObjectType;
  * Interface that all catalog objects implement.
  */
 public interface CatalogObject extends HasName {
+
+  /**
+   * Catalog objects are often serialized to Thrift. When doing so, many of the
+   * objects have a minimal "descriptor" form used in query execution as well as
+   * a more complete "full" form with all information, used when transferring the
+   * catalog object from catalogd to the impalads.
+   */
+  static enum ThriftObjectType {
+    FULL,
+    DESCRIPTOR_ONLY
+  };
+
   // Returns the TCatalogObject type of this Catalog object.
   public TCatalogObjectType getCatalogObjectType();
 

http://git-wip-us.apache.org/repos/asf/impala/blob/04ed7ac1/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java b/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
index 12b152f..4f1d68d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
@@ -31,6 +31,7 @@ import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.analysis.NullLiteral;
 import org.apache.impala.analysis.PartitionKeyValue;
 import org.apache.impala.analysis.ToSqlUtils;
+import org.apache.impala.catalog.CatalogObject.ThriftObjectType;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.thrift.TColumnDescriptor;
 import org.apache.impala.thrift.THdfsPartition;
@@ -322,7 +323,7 @@ public abstract class FeCatalogUtils {
   }
 
   public static THdfsPartition fsPartitionToThrift(FeFsPartition part,
-      boolean includeFileDesc, boolean includeIncrementalStats) {
+      ThriftObjectType type, boolean includeIncrementalStats) {
     HdfsStorageDescriptor sd = part.getInputFormatDescriptor();
     THdfsPartition thriftHdfsPart = new THdfsPartition(
         sd.getLineDelim(),
@@ -333,19 +334,20 @@ public abstract class FeCatalogUtils {
         sd.getFileFormat().toThrift(),
         Expr.treesToThrift(part.getPartitionValues()),
         sd.getBlockSize());
-    thriftHdfsPart.setLocation(part.getLocationAsThrift());
-    thriftHdfsPart.setStats(new TTableStats(part.getNumRows()));
-    thriftHdfsPart.setAccess_level(part.getAccessLevel());
-    thriftHdfsPart.setIs_marked_cached(part.isMarkedCached());
     thriftHdfsPart.setId(part.getId());
-    thriftHdfsPart.setHas_incremental_stats(part.hasIncrementalStats());
-    // IMPALA-4902: Shallow-clone the map to avoid concurrent modifications. One thread
-    // may try to serialize the returned THdfsPartition after releasing the table's lock,
-    // and another thread doing DDL may modify the map.
-    thriftHdfsPart.setHms_parameters(Maps.newHashMap(
-        includeIncrementalStats ? part.getParameters() :
-          part.getFilteredHmsParameters()));
-    if (includeFileDesc) {
+    thriftHdfsPart.setLocation(part.getLocationAsThrift());
+    if (type == ThriftObjectType.FULL) {
+      thriftHdfsPart.setStats(new TTableStats(part.getNumRows()));
+      thriftHdfsPart.setAccess_level(part.getAccessLevel());
+      thriftHdfsPart.setIs_marked_cached(part.isMarkedCached());
+      thriftHdfsPart.setHas_incremental_stats(part.hasIncrementalStats());
+      // IMPALA-4902: Shallow-clone the map to avoid concurrent modifications. One thread
+      // may try to serialize the returned THdfsPartition after releasing the table's lock,
+      // and another thread doing DDL may modify the map.
+      thriftHdfsPart.setHms_parameters(Maps.newHashMap(
+          includeIncrementalStats ? part.getParameters() :
+            part.getFilteredHmsParameters()));
+
       // Add block location information
       long numBlocks = 0;
       long totalFileBytes = 0;

http://git-wip-us.apache.org/repos/asf/impala/blob/04ed7ac1/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 0995086..4ae1f9b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -1722,7 +1722,8 @@ public class HdfsTable extends Table implements FeFsTable {
     // need any information below the THdfsPartition level.
     TTableDescriptor tableDesc = new TTableDescriptor(tableId, TTableType.HDFS_TABLE,
         getTColumnDescriptors(), numClusteringCols_, name_, db_.getName());
-    tableDesc.setHdfsTable(getTHdfsTable(false, referencedPartitions));
+    tableDesc.setHdfsTable(getTHdfsTable(ThriftObjectType.DESCRIPTOR_ONLY,
+        referencedPartitions));
     return tableDesc;
   }
 
@@ -1731,25 +1732,30 @@ public class HdfsTable extends Table implements FeFsTable {
     // Send all metadata between the catalog service and the FE.
     TTable table = super.toThrift();
     table.setTable_type(TTableType.HDFS_TABLE);
-    table.setHdfs_table(getTHdfsTable(true, null));
+    table.setHdfs_table(getTHdfsTable(ThriftObjectType.FULL, null));
     return table;
   }
 
   /**
-   * Create a THdfsTable corresponding to this HdfsTable. If includeFileDesc is true,
-   * then then all partitions and THdfsFileDescs of each partition should be included.
-   * Otherwise, don't include any THdfsFileDescs, and include only those partitions in
-   * the refPartitions set (the backend doesn't need metadata for unreferenced
-   * partitions). To prevent the catalog from hitting an OOM error while trying to
+   * Create a THdfsTable corresponding to this HdfsTable. If serializing the "FULL"
+   * information, then then all partitions and THdfsFileDescs of each partition should be
+   * included. Otherwise, don't include any THdfsFileDescs, and include only those
+   * partitions in the refPartitions set (the backend doesn't need metadata for
+   * unreferenced partitions). In addition, metadata that is not used by the backend will
+   * be omitted.
+   *
+   * To prevent the catalog from hitting an OOM error while trying to
    * serialize large partition incremental stats, we estimate the stats size and filter
    * the incremental stats data from partition objects if the estimate exceeds
    * --inc_stats_size_limit_bytes. This function also collects storage related statistics
    *  (e.g. number of blocks, files, etc) in order to compute an estimate of the metadata
    *  size of this table.
    */
-  private THdfsTable getTHdfsTable(boolean includeFileDesc, Set<Long> refPartitions) {
-    // includeFileDesc implies all partitions should be included (refPartitions == null).
-    Preconditions.checkState(!includeFileDesc || refPartitions == null);
+  private THdfsTable getTHdfsTable(ThriftObjectType type, Set<Long> refPartitions) {
+    if (type == ThriftObjectType.FULL) {
+      // "full" implies all partitions should be included.
+      Preconditions.checkArgument(refPartitions == null);
+    }
     long memUsageEstimate = 0;
     int numPartitions =
         (refPartitions == null) ? partitionMap_.values().size() : refPartitions.size();
@@ -1764,13 +1770,13 @@ public class HdfsTable extends Table implements FeFsTable {
       long id = partition.getId();
       if (refPartitions == null || refPartitions.contains(id)) {
         THdfsPartition tHdfsPartition = FeCatalogUtils.fsPartitionToThrift(
-            partition, includeFileDesc, includeIncrementalStats);
+            partition, type, includeIncrementalStats);
         if (tHdfsPartition.isSetHas_incremental_stats() &&
             tHdfsPartition.isHas_incremental_stats()) {
           memUsageEstimate += getColumns().size() * STATS_SIZE_PER_COLUMN_BYTES;
           hasIncrementalStats_ = true;
         }
-        if (includeFileDesc) {
+        if (type == ThriftObjectType.FULL) {
           Preconditions.checkState(tHdfsPartition.isSetNum_blocks() &&
               tHdfsPartition.isSetTotal_file_size_bytes());
           stats.numBlocks += tHdfsPartition.getNum_blocks();
@@ -1781,10 +1787,10 @@ public class HdfsTable extends Table implements FeFsTable {
         idToPartition.put(id, tHdfsPartition);
       }
     }
-    if (includeFileDesc) fileMetadataStats_.set(stats);
+    if (type == ThriftObjectType.FULL) fileMetadataStats_.set(stats);
 
     THdfsPartition prototypePartition = FeCatalogUtils.fsPartitionToThrift(
-        prototypePartition_, false, false);
+        prototypePartition_, ThriftObjectType.DESCRIPTOR_ONLY, false);
 
     memUsageEstimate += fileMetadataStats_.numFiles * PER_FD_MEM_USAGE_BYTES +
         fileMetadataStats_.numBlocks * PER_BLOCK_MEM_USAGE_BYTES;
@@ -1792,7 +1798,7 @@ public class HdfsTable extends Table implements FeFsTable {
     THdfsTable hdfsTable = new THdfsTable(hdfsBaseDir_, getColumnNames(),
         nullPartitionKeyValue_, nullColumnValue_, idToPartition, prototypePartition);
     hdfsTable.setAvroSchema(avroSchema_);
-    if (includeFileDesc) {
+    if (type == ThriftObjectType.FULL) {
       // Network addresses are used only by THdfsFileBlocks which are inside
       // THdfsFileDesc, so include network addreses only when including THdfsFileDesc.
       hdfsTable.setNetwork_addresses(hostIndex_.getList());

http://git-wip-us.apache.org/repos/asf/impala/blob/04ed7ac1/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
index 35044e3..82af240 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.analysis.NullLiteral;
 import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.CatalogObject.ThriftObjectType;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.ColumnStats;
 import org.apache.impala.catalog.FeCatalogUtils;
@@ -229,13 +230,12 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
     for (FeFsPartition partition : partitions) {
       idToPartition.put(partition.getId(),
           FeCatalogUtils.fsPartitionToThrift(partition,
-              /*includeFileDesc=*/false,
+              ThriftObjectType.DESCRIPTOR_ONLY,
               /*includeIncrementalStats=*/false));
     }
 
     THdfsPartition tPrototypePartition = FeCatalogUtils.fsPartitionToThrift(
-        createPrototypePartition(),
-        /*includeFileDesc=*/false,
+        createPrototypePartition(), ThriftObjectType.DESCRIPTOR_ONLY,
         /*includeIncrementalStats=*/false);
 
     // TODO(todd): implement avro schema support


[3/5] impala git commit: [DOCS] Corrected the default inc_stats_size_limit_bytes value

Posted by ta...@apache.org.
[DOCS] Corrected the default inc_stats_size_limit_bytes value

Change-Id: I4cacc6f1b9aaa1e2105e5436451ee906e3111679
Reviewed-on: http://gerrit.cloudera.org:8080/11050
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Tim Armstrong <ta...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/729e1fb6
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/729e1fb6
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/729e1fb6

Branch: refs/heads/master
Commit: 729e1fb6788eee9d48dc73860d4bfc892565d134
Parents: 04ed7ac
Author: Alex Rodoni <ar...@cloudera.com>
Authored: Wed Jul 25 12:49:54 2018 -0700
Committer: Alex Rodoni <ar...@cloudera.com>
Committed: Thu Jul 26 23:18:16 2018 +0000

----------------------------------------------------------------------
 docs/topics/impala_perf_stats.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/729e1fb6/docs/topics/impala_perf_stats.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_perf_stats.xml b/docs/topics/impala_perf_stats.xml
index 9bf7112..15a00f7 100644
--- a/docs/topics/impala_perf_stats.xml
+++ b/docs/topics/impala_perf_stats.xml
@@ -726,7 +726,7 @@ show column stats year_month_day;
 
           <p>
             The default value for <codeph>inc_stats_size_limit_bytes</codeph>
-            is 20971520, 200 MB.
+            is 209715200, 200 MB.
           </p>
 
           <p> To change the <codeph>inc_stats_size_limit_bytes</codeph> value,


[5/5] impala git commit: IMPALA-7360: sequence scanners sometimes skip blocks

Posted by ta...@apache.org.
IMPALA-7360: sequence scanners sometimes skip blocks

The handling of sync markers after processing a block was broken - eos_
was set if the sync marker straddles the boundary. The expected
behaviour (documented by comments) in this case is that the current
scanner should process the next block, if there is one.

If you look at the logic before the IMPALA-3905 change in commit
931bf49cd90e496df6bf260ae668ec6944f0016c, it split the checking
of eosr() and eof() similar to this patch.

Testing:
Add regression tests that scans a large table with a variety of
different scan range lengths, with some randomisation to exercise
different edge cases. This reliably triggered the bug.

Change-Id: I49a70a4925b0271204b8eea4f980299d7654805a
Reviewed-on: http://gerrit.cloudera.org:8080/11062
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/170956b8
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/170956b8
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/170956b8

Branch: refs/heads/master
Commit: 170956b8541a2b159ba711eb1022451f159e3060
Parents: 3bbfbb5
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Thu Jul 26 11:21:35 2018 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Fri Jul 27 05:00:20 2018 +0000

----------------------------------------------------------------------
 be/src/exec/base-sequence-scanner.cc            | 47 ++++++++++++--------
 .../tpch/queries/tpch-scan-range-lengths.test   | 28 ++++++++++++
 tests/query_test/test_scanners.py               | 31 ++++++++++++-
 3 files changed, 86 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/170956b8/be/src/exec/base-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc
index 9d95b0b..6757ec3 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -206,27 +206,36 @@ Status BaseSequenceScanner::GetNextInternal(RowBatch* row_batch) {
 }
 
 Status BaseSequenceScanner::ReadSync() {
-  uint8_t* hash;
-  int64_t out_len;
-  bool success = stream_->GetBytes(SYNC_HASH_SIZE, &hash, &out_len, &parse_status_);
-  // We are done when we read a sync marker occurring completely in the next scan range.
-  eos_ = stream_->eosr() || stream_->eof();
-  if (!success) return parse_status_;
-  if (out_len != SYNC_HASH_SIZE) {
-    return Status(Substitute("Hit end of stream after reading $0 bytes of $1-byte "
-        "synchronization marker", out_len, SYNC_HASH_SIZE));
-  } else if (memcmp(hash, header_->sync, SYNC_HASH_SIZE) != 0) {
-    stringstream ss;
-    ss  << "Bad synchronization marker" << endl
-        << "  Expected: '"
-        << ReadWriteUtil::HexDump(header_->sync, SYNC_HASH_SIZE) << "'" << endl
-        << "  Actual:   '"
-        << ReadWriteUtil::HexDump(hash, SYNC_HASH_SIZE) << "'";
-    return Status(ss.str());
+  DCHECK(!eos_);
+  if (stream_->eosr()) {
+    // Either we're at the end of file or the next sync marker is completely in the next
+    // scan range.
+    eos_ = true;
+  } else {
+    // Not at end of scan range or file - we expect there to be another sync marker, which
+    // is either followed by another block or the end of the file.
+    uint8_t* hash;
+    int64_t out_len;
+    bool success = stream_->GetBytes(SYNC_HASH_SIZE, &hash, &out_len, &parse_status_);
+    if (!success) return parse_status_;
+    if (out_len != SYNC_HASH_SIZE) {
+      return Status(Substitute("Hit end of stream after reading $0 bytes of $1-byte "
+          "synchronization marker", out_len, SYNC_HASH_SIZE));
+    } else if (memcmp(hash, header_->sync, SYNC_HASH_SIZE) != 0) {
+      stringstream ss;
+      ss  << "Bad synchronization marker" << endl
+          << "  Expected: '"
+          << ReadWriteUtil::HexDump(header_->sync, SYNC_HASH_SIZE) << "'" << endl
+          << "  Actual:   '"
+          << ReadWriteUtil::HexDump(hash, SYNC_HASH_SIZE) << "'";
+      return Status(ss.str());
+    }
+    // If we read the sync marker at end of file then we're done!
+    eos_ = stream_->eof();
+    ++num_syncs_;
+    block_start_ = stream_->file_offset();
   }
   total_block_size_ += stream_->file_offset() - block_start_;
-  block_start_ = stream_->file_offset();
-  ++num_syncs_;
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/170956b8/testdata/workloads/tpch/queries/tpch-scan-range-lengths.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/tpch/queries/tpch-scan-range-lengths.test b/testdata/workloads/tpch/queries/tpch-scan-range-lengths.test
new file mode 100644
index 0000000..7e2d40b
--- /dev/null
+++ b/testdata/workloads/tpch/queries/tpch-scan-range-lengths.test
@@ -0,0 +1,28 @@
+====
+---- QUERY
+# Check number of rows.
+select count(*)
+from lineitem
+---- RESULTS
+6001215
+---- TYPES
+BIGINT
+====
+---- QUERY
+# Check values in numeric column.
+select sum(l_quantity)
+from lineitem
+---- RESULTS
+153078795.00
+---- TYPES
+DECIMAL
+====
+---- QUERY
+# Checksum values in string column.
+select sum(murmur_hash(l_comment))
+from lineitem
+---- RESULTS
+54541443255294703
+---- TYPES
+BIGINT
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/170956b8/tests/query_test/test_scanners.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index bd3c286..b4e77be 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -740,7 +740,8 @@ class TestParquet(ImpalaTestSuite):
 # 2. scan range with no tuple
 # 3. tuple that span across multiple scan ranges
 # 4. scan range length = 16 for ParseSse() execution path
-MAX_SCAN_RANGE_LENGTHS = [0, 1, 2, 5, 16, 17, 32]
+# 5. scan range fits at least one row
+MAX_SCAN_RANGE_LENGTHS = [0, 1, 2, 5, 16, 17, 32, 512]
 
 class TestScanRangeLengths(ImpalaTestSuite):
   @classmethod
@@ -758,6 +759,34 @@ class TestScanRangeLengths(ImpalaTestSuite):
         vector.get_value('max_scan_range_length')
     self.run_test_case('QueryTest/hdfs-tiny-scan', vector)
 
+
+# Scan range lengths for TPC-H data sets. Test larger scan range sizes. Random
+# variation to the length is added by the test in order to exercise edge cases.
+TPCH_SCAN_RANGE_LENGTHS = [128 * 1024, 16 * 1024 * 1024]
+
+class TestTpchScanRangeLengths(ImpalaTestSuite):
+  """Exercise different scan range lengths on the larger TPC-H data sets."""
+  @classmethod
+  def get_workload(cls):
+    return 'tpch'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestTpchScanRangeLengths, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(
+        ImpalaTestDimension('scan_range_length', *TPCH_SCAN_RANGE_LENGTHS))
+    # IMPALA-7360: sequence file scan returns spurious errors
+    cls.ImpalaTestMatrix.add_constraint(
+        lambda v: v.get_value('table_format').file_format != 'seq')
+
+  def test_tpch_scan_ranges(self, vector):
+    # Randomly adjust the scan range length to exercise different code paths.
+    max_scan_range_length = \
+        int(vector.get_value('scan_range_length') * (random.random() + 0.5))
+    LOG.info("max_scan_range_length={0}".format(max_scan_range_length))
+    vector.get_value('exec_option')['max_scan_range_length'] = max_scan_range_length
+    self.run_test_case('tpch-scan-range-lengths', vector)
+
 # More tests for text scanner
 # 1. Test file that ends w/o tuple delimiter
 # 2. Test file with escape character


[4/5] impala git commit: IMPALA-7225: REFRESH..PARTITION shoud not reset partition's num rows

Posted by ta...@apache.org.
IMPALA-7225: REFRESH..PARTITION shoud not reset partition's num rows

The bug was that HdfsTable#reloadPartition() was not setting the
partition's numRows_ after reloading the partition.

Fix: Move the code to set numRows_ to createPartition() call so that
the callers need not explicitly set it after calling createPartition().
Additionally consolidates other common code in createPartition().

Added a test.

Change-Id: I5886684e26b453c76f331f6e807f813146c6bf3a
Reviewed-on: http://gerrit.cloudera.org:8080/11056
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/3bbfbb5e
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/3bbfbb5e
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/3bbfbb5e

Branch: refs/heads/master
Commit: 3bbfbb5ed90f67de1fa0c343e6a8c63e796351f6
Parents: 729e1fb
Author: Bharath Vissapragada <bh...@cloudera.com>
Authored: Wed Jul 25 23:52:53 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Fri Jul 27 00:11:29 2018 +0000

----------------------------------------------------------------------
 .../org/apache/impala/catalog/HdfsTable.java    | 38 +++++++-------------
 tests/metadata/test_refresh_partition.py        | 29 +++++++++++++++
 2 files changed, 42 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/3bbfbb5e/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 4ae1f9b..0f30407 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -852,19 +852,6 @@ public class HdfsTable extends Table implements FeFsTable {
         // If the partition is null, its HDFS path does not exist, and it was not added
         // to this table's partition list. Skip the partition.
         if (partition == null) continue;
-        if (msPartition.getParameters() != null) {
-          partition.setNumRows(FeCatalogUtils.getRowCount(msPartition.getParameters()));
-        }
-        if (!TAccessLevelUtil.impliesWriteAccess(partition.getAccessLevel())) {
-          // TODO: READ_ONLY isn't exactly correct because the it's possible the
-          // partition does not have READ permissions either. When we start checking
-          // whether we can READ from a table, this should be updated to set the
-          // table's access level to the "lowest" effective level across all
-          // partitions. That is, if one partition has READ_ONLY and another has
-          // WRITE_ONLY the table's access level should be NONE.
-          accessLevel_ = TAccessLevel.READ_ONLY;
-        }
-
         Path partDir = FileSystemUtil.createFullyQualifiedPath(
             new Path(msPartition.getSd().getLocation()));
         List<HdfsPartition> parts = partsByPath.get(partDir);
@@ -1096,6 +1083,19 @@ public class HdfsTable extends Table implements FeFsTable {
           new HdfsPartition(this, msPartition, keyValues, fileFormatDescriptor,
           new ArrayList<FileDescriptor>(), getAvailableAccessLevel(fs, partDirPath));
       partition.checkWellFormed();
+      // Set the partition's #rows.
+      if (msPartition != null && msPartition.getParameters() != null) {
+         partition.setNumRows(FeCatalogUtils.getRowCount(msPartition.getParameters()));
+      }
+      if (!TAccessLevelUtil.impliesWriteAccess(partition.getAccessLevel())) {
+          // TODO: READ_ONLY isn't exactly correct because the it's possible the
+          // partition does not have READ permissions either. When we start checking
+          // whether we can READ from a table, this should be updated to set the
+          // table's access level to the "lowest" effective level across all
+          // partitions. That is, if one partition has READ_ONLY and another has
+          // WRITE_ONLY the table's access level should be NONE.
+          accessLevel_ = TAccessLevel.READ_ONLY;
+      }
       return partition;
     } catch (IOException e) {
       throw new CatalogException("Error initializing partition", e);
@@ -1660,18 +1660,6 @@ public class HdfsTable extends Table implements FeFsTable {
       // If the partition is null, its HDFS path does not exist, and it was not added to
       // this table's partition list. Skip the partition.
       if (partition == null) continue;
-      if (msPartition.getParameters() != null) {
-        partition.setNumRows(FeCatalogUtils.getRowCount(msPartition.getParameters()));
-      }
-      if (!TAccessLevelUtil.impliesWriteAccess(partition.getAccessLevel())) {
-        // TODO: READ_ONLY isn't exactly correct because the it's possible the
-        // partition does not have READ permissions either. When we start checking
-        // whether we can READ from a table, this should be updated to set the
-        // table's access level to the "lowest" effective level across all
-        // partitions. That is, if one partition has READ_ONLY and another has
-        // WRITE_ONLY the table's access level should be NONE.
-        accessLevel_ = TAccessLevel.READ_ONLY;
-      }
       refreshPartitionFileMetadata(partition);
     }
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/3bbfbb5e/tests/metadata/test_refresh_partition.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_refresh_partition.py b/tests/metadata/test_refresh_partition.py
index 187055c..2c31b93 100644
--- a/tests/metadata/test_refresh_partition.py
+++ b/tests/metadata/test_refresh_partition.py
@@ -44,6 +44,35 @@ class TestRefreshPartition(ImpalaTestSuite):
     cls.ImpalaTestMatrix.add_dimension(
         create_uncompressed_text_dimension(cls.get_workload()))
 
+  def test_refresh_partition_num_rows(self, vector, unique_database):
+    """Refreshing a partition should not change it's numRows stat."""
+    # Create a partitioned table and add data to it.
+    tbl = unique_database + ".t1"
+    self.client.execute("create table %s(a int) partitioned by (b int)" % tbl)
+    self.client.execute("insert into %s partition(b=1) values (1)" % tbl)
+    # Compute stats on tbl. It should populate the partition num rows.
+    self.client.execute("compute stats %s" % tbl)
+    result = self.client.execute("show partitions %s" % tbl)
+    # Format: partition/#Rows/#Files (first 3 entries)
+    assert result.get_data().startswith("1\t1\t1"),\
+        "Incorrect partition stats %s" % result.get_data()
+    # Add another file to the same partition using hive.
+    self.run_stmt_in_hive("insert into table %s partition (b=1) values (2)" % tbl)
+    # Make sure Impala still sees a single row.
+    assert "1" == self.client.execute("select count(*) from %s" % tbl).get_data()
+    # refresh the partition and make sure the new row is visible
+    self.client.execute("refresh %s partition (b=1)" % tbl)
+    assert "2" == self.client.execute("select count(*) from %s" % tbl).get_data()
+    # Make sure the partition num rows are unchanged and still 1 but the #files is updated.
+    result = self.client.execute("show partitions %s" % tbl)
+    assert result.get_data().startswith("1\t1\t2"),\
+        "Incorrect partition stats %s" % result.get_data()
+    # Do a full table refresh and it should still remain the same.
+    self.client.execute("refresh %s" % tbl)
+    result = self.client.execute("show partitions %s" % tbl)
+    assert result.get_data().startswith("1\t1\t2"),\
+        "Incorrect partition stats %s" % result.get_data()
+
   def test_add_hive_partition_and_refresh(self, vector, unique_database):
     """
     Partition added in Hive can be viewed in Impala after refreshing