You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by tm...@apache.org on 2019/08/27 22:26:24 UTC

[impala] branch master updated (3f4cbe9 -> 3209055)

This is an automated email from the ASF dual-hosted git repository.

tmarshall pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 3f4cbe9  IMPALA-8889: Fix error messages for unsupported operations on acid tables
     new db00748  IMPALA-8896: fix alterPartitionsWithTransaction() that emitted too many alter tables.
     new e40d24e  IMPALA-8793: Implement TRUNCATE for insert-only ACID tables
     new eea617b  IMPALA-8890: Advance read page in UnpinStream
     new 3209055  Fix THttpServer to not call the cookie function with an empty cookie

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/exec/analytic-eval-node.cc                  |   3 +-
 be/src/exec/grouping-aggregator-partition.cc       |   5 +-
 be/src/exec/grouping-aggregator.cc                 |  16 ++-
 be/src/exec/grouping-aggregator.h                  |   2 +-
 be/src/exec/partitioned-hash-join-builder.cc       |   5 +-
 be/src/exec/partitioned-hash-join-node.cc          |  19 ++--
 be/src/runtime/buffered-tuple-stream-test.cc       |  78 ++++++++++++-
 be/src/runtime/buffered-tuple-stream.cc            |  10 +-
 be/src/runtime/buffered-tuple-stream.h             |   2 +-
 be/src/runtime/spillable-row-batch-queue.cc        |   3 +-
 be/src/transport/THttpServer.cpp                   |  14 ++-
 .../org/apache/impala/compat/MetastoreShim.java    |  10 +-
 .../org/apache/impala/compat/MetastoreShim.java    |  19 +---
 .../org/apache/impala/analysis/TruncateStmt.java   |   1 -
 .../java/org/apache/impala/catalog/Catalog.java    |  56 +++++++++-
 .../org/apache/impala/catalog/Transaction.java     |  73 ++++++++++++
 .../apache/impala/service/CatalogOpExecutor.java   | 122 ++++++++++++++++++---
 .../java/org/apache/impala/util/AcidUtils.java     |  11 +-
 .../org/apache/impala/analysis/AnalyzerTest.java   |   3 +-
 .../queries/QueryTest/acid-truncate.test           |  97 ++++++++++++++++
 tests/metadata/test_hms_integration.py             |  20 ++++
 tests/query_test/test_acid.py                      |  11 ++
 22 files changed, 495 insertions(+), 85 deletions(-)
 create mode 100644 fe/src/main/java/org/apache/impala/catalog/Transaction.java
 create mode 100644 testdata/workloads/functional-query/queries/QueryTest/acid-truncate.test


[impala] 02/04: IMPALA-8793: Implement TRUNCATE for insert-only ACID tables

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit e40d24ee0aa2f8ca928618eb6d86728ee5422958
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Wed Aug 7 13:54:04 2019 +0200

    IMPALA-8793: Implement TRUNCATE for insert-only ACID tables
    
    This commit adds support for the TRUNCATE statement for transactional
    tables. TRUNCATE on transactional tables doesn't remove the files, it
    just creates new empty ACID base directories.
    
    Although from a filesystem point of view the directories won't be
    completely empty. They will contain the hidden '_empty' file, so
    FileSystemUtil.listFiles() will list them. It's similar to Hive's
    behavior which creates the magic file '_orc_acid_version' in each
    transactional directory. Except Impala only creates these '_empty'
    files in empty base directories.
    
    All the functionality is implemented in the catalog, therefore
    transaction handling, table locking, heartbeating also happens there.
    Added the new Transaction class that should prevent leakage of
    transactions, it should be used in try-with-resources statements.
    
    Testing:
    Added backend tests that truncate non-partitioned and partitioned
    tables as well.
    The tests check whether the statistics were removed.
    The tests also check if Hive sees the effect of truncation.
    
    Change-Id: Ic749b7f27da157e1c0ebf9b7e9b6ee09afad122a
    Reviewed-on: http://gerrit.cloudera.org:8080/14071
    Reviewed-by: Zoltan Borok-Nagy <bo...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../org/apache/impala/compat/MetastoreShim.java    |  10 +-
 .../org/apache/impala/compat/MetastoreShim.java    |  11 +-
 .../org/apache/impala/analysis/TruncateStmt.java   |   1 -
 .../java/org/apache/impala/catalog/Catalog.java    |  56 +++++++++-
 .../org/apache/impala/catalog/Transaction.java     |  73 ++++++++++++
 .../apache/impala/service/CatalogOpExecutor.java   | 122 ++++++++++++++++++---
 .../java/org/apache/impala/util/AcidUtils.java     |  11 +-
 .../org/apache/impala/analysis/AnalyzerTest.java   |   3 +-
 .../queries/QueryTest/acid-truncate.test           |  97 ++++++++++++++++
 tests/metadata/test_hms_integration.py             |  20 ++++
 tests/query_test/test_acid.py                      |  11 ++
 11 files changed, 368 insertions(+), 47 deletions(-)

diff --git a/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java
index 07d2aaf..83c95df 100644
--- a/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java
@@ -65,6 +65,7 @@ import org.apache.impala.service.Frontend;
 import org.apache.impala.service.MetadataOp;
 import org.apache.impala.thrift.TMetadataOpRequest;
 import org.apache.impala.thrift.TResultSet;
+import org.apache.impala.util.AcidUtils.TblTransaction;
 import org.apache.thrift.TException;
 
 /**
@@ -73,15 +74,6 @@ import org.apache.thrift.TException;
  */
 public class MetastoreShim {
 
-  /**
-   * Empty class, should not be instantiated.
-   */
-  public static class TblTransaction {
-    public TblTransaction() {
-      throw new UnsupportedOperationException("new TblTransaction");
-    }
-  }
-
   public static TblTransaction createTblTransaction(
      IMetaStoreClient client, Table tbl, long txnId) {
     throw new UnsupportedOperationException("createTblTransaction");
diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index d5a3da6..da33946 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -91,6 +91,7 @@ import org.apache.impala.service.Frontend;
 import org.apache.impala.service.MetadataOp;
 import org.apache.impala.thrift.TMetadataOpRequest;
 import org.apache.impala.thrift.TResultSet;
+import org.apache.impala.util.AcidUtils.TblTransaction;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
 
@@ -132,16 +133,6 @@ public class MetastoreShim {
   public static String TRANSACTION_USER_ID = "Impala";
 
   /**
-   * Transaction parameters needed for single table operations.
-   */
-  public static class TblTransaction {
-    public long txnId;
-    public boolean ownsTxn;
-    public long writeId;
-    public String validWriteIds;
-  }
-
-  /**
    * Initializes and returns a TblTransaction object for table 'tbl'.
    * Opens a new transaction if txnId is not valid.
    */
diff --git a/fe/src/main/java/org/apache/impala/analysis/TruncateStmt.java b/fe/src/main/java/org/apache/impala/analysis/TruncateStmt.java
index b08b3ca..60d6bf2 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TruncateStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TruncateStmt.java
@@ -68,7 +68,6 @@ public class TruncateStmt extends StatementBase {
           "TRUNCATE TABLE not supported on non-HDFS table: %s", table_.getFullName()));
     }
     analyzer.checkTableCapability(table_, Analyzer.OperationType.WRITE);
-    analyzer.ensureTableNotTransactional(table_, "TRUNCATE TABLE");
   }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/catalog/Catalog.java b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
index e73de73..02e2a22 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
@@ -26,11 +26,11 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.DataOperationType;
 import org.apache.hadoop.hive.metastore.api.LockComponent;
 import org.apache.hadoop.hive.metastore.api.LockLevel;
 import org.apache.hadoop.hive.metastore.api.LockType;
-
 import org.apache.impala.analysis.FunctionName;
 import org.apache.impala.authorization.AuthorizationPolicy;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
@@ -667,26 +667,70 @@ public abstract class Catalog implements AutoCloseable {
   }
 
   /**
+   * Opens a transaction and returns a Transaction object that can be used in a
+   * try-with-resources statement. That way transactions won't leak.
+   * @param hmsClient the client towards HMS.
+   * @param ctx Context for heartbeating.
+   * @return an AutoCloseable transaction object.
+   * @throws TransactionException
+   */
+  public Transaction openTransaction(IMetaStoreClient hmsClient, HeartbeatContext ctx)
+      throws TransactionException {
+    return new Transaction(hmsClient, transactionKeepalive_, "Impala Catalog", ctx);
+  }
+
+  /**
    * Creates an exclusive lock for a particular table and acquires it in the HMS. Starts
    * heartbeating the lock. This function is for locks that doesn't belong to a
-   * transaction.
+   * transaction. The client of this function is responsible for calling
+   * 'releaseTableLock()'.
    * @param dbName Name of the DB where the particular table is.
    * @param tableName Name of the table where the lock is acquired.
    * @throws TransactionException
    */
-  public long lockTable(String dbName, String tableName, HeartbeatContext ctx)
+  public long lockTableStandalone(String dbName, String tableName, HeartbeatContext ctx)
       throws TransactionException {
+    return lockTableInternal(dbName, tableName, 0L, DataOperationType.NO_TXN, ctx);
+  }
+
+  /**
+   * Creates an exclusive lock for a particular table and acquires it in the HMS.
+   * This function can only be invoked in a transaction context, i.e. 'txnId'
+   * cannot be 0.
+   * @param dbName Name of the DB where the particular table is.
+   * @param tableName Name of the table where the lock is acquired.
+   * @param transaction the transaction that needs to lock the table.
+   * @throws TransactionException
+   */
+  public void lockTableInTransaction(String dbName, String tableName,
+      Transaction transaction, DataOperationType opType, HeartbeatContext ctx)
+      throws TransactionException {
+    Preconditions.checkState(transaction.getId() > 0);
+    lockTableInternal(dbName, tableName, transaction.getId(), opType, ctx);
+  }
+
+  /**
+   * Creates an exclusive lock for a particular table and acquires it in the HMS. Starts
+   * heartbeating the lock if it doesn't have a transaction context.
+   * @param dbName Name of the DB where the particular table is.
+   * @param tableName Name of the table where the lock is acquired.
+   * @param txnId id of the transaction, 0 for standalone locks.
+   * @throws TransactionException
+   */
+  private long lockTableInternal(String dbName, String tableName, long txnId,
+      DataOperationType opType, HeartbeatContext ctx) throws TransactionException {
+    Preconditions.checkState(txnId >= 0);
     LockComponent lockComponent = new LockComponent();
     lockComponent.setDbname(dbName);
     lockComponent.setTablename(tableName);
     lockComponent.setLevel(LockLevel.TABLE);
     lockComponent.setType(LockType.EXCLUSIVE);
-    lockComponent.setOperationType(DataOperationType.NO_TXN);
+    lockComponent.setOperationType(opType);
     List<LockComponent> lockComponents = Arrays.asList(lockComponent);
     long lockId = -1L;
     try (MetaStoreClient client = metaStoreClientPool_.getClient()) {
-      lockId = MetastoreShim.acquireLock(client.getHiveClient(), 0L, lockComponents);
-      transactionKeepalive_.addLock(lockId, ctx);
+      lockId = MetastoreShim.acquireLock(client.getHiveClient(), txnId, lockComponents);
+      if (txnId == 0L) transactionKeepalive_.addLock(lockId, ctx);
     }
     return lockId;
   }
diff --git a/fe/src/main/java/org/apache/impala/catalog/Transaction.java b/fe/src/main/java/org/apache/impala/catalog/Transaction.java
new file mode 100644
index 0000000..044a1c1
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/Transaction.java
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import org.apache.curator.shaded.com.google.common.base.Preconditions;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.impala.common.TransactionException;
+import org.apache.impala.common.TransactionKeepalive;
+import org.apache.impala.common.TransactionKeepalive.HeartbeatContext;
+import org.apache.impala.compat.MetastoreShim;
+import org.apache.log4j.Logger;
+
+/**
+ * Transaction class that implements the AutoCloseable interface and hence the callers
+ * should use the try-with-resources statement while creating an instance. In its
+ * constructor it creates a transaction and also registers it for heartbeating.
+ * In close() it aborts the transaction if it wasn't committed earlier.
+ */
+public class Transaction implements AutoCloseable {
+  private static final Logger LOG = Logger.getLogger(Transaction.class);
+
+  private long transactionId_ = -1;
+  private IMetaStoreClient hmsClient_;
+  private TransactionKeepalive keepalive_;
+
+  public Transaction(IMetaStoreClient hmsClient, TransactionKeepalive keepalive,
+      String user, HeartbeatContext ctx)
+      throws TransactionException {
+    Preconditions.checkNotNull(hmsClient);
+    Preconditions.checkNotNull(keepalive);
+    hmsClient_ = hmsClient;
+    keepalive_ = keepalive;
+    transactionId_ = MetastoreShim.openTransaction(hmsClient_);
+    keepalive_.addTransaction(transactionId_, ctx);
+  }
+
+  public long getId() { return transactionId_; }
+
+  public void commit() throws TransactionException {
+    Preconditions.checkState(transactionId_ > 0);
+    keepalive_.deleteTransaction(transactionId_);
+    MetastoreShim.commitTransaction(hmsClient_, transactionId_);
+    transactionId_ = -1;
+  }
+
+  @Override
+  public void close() {
+    if (transactionId_ <= 0) return;
+
+    keepalive_.deleteTransaction(transactionId_);
+    try {
+      MetastoreShim.abortTransaction(hmsClient_, transactionId_);
+    } catch (TransactionException e) {
+      LOG.error("Cannot abort transaction with id " + String.valueOf(transactionId_), e);
+    }
+    transactionId_ = -1;
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index c6b86be..64b1ae1 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -26,19 +26,21 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import java.util.stream.Collectors;
+
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.PartitionDropOptions;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.Warehouse;
@@ -47,6 +49,7 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
@@ -89,6 +92,7 @@ import org.apache.impala.catalog.ScalarFunction;
 import org.apache.impala.catalog.Table;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.TableNotFoundException;
+import org.apache.impala.catalog.Transaction;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.View;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey;
@@ -173,6 +177,7 @@ import org.apache.impala.thrift.TUpdateCatalogRequest;
 import org.apache.impala.thrift.TUpdateCatalogResponse;
 import org.apache.impala.util.CompressionUtil;
 import org.apache.impala.util.AcidUtils;
+import org.apache.impala.util.AcidUtils.TblTransaction;
 import org.apache.impala.util.FunctionUtils;
 import org.apache.impala.util.HdfsCachingUtil;
 import org.apache.impala.util.KuduUtil;
@@ -949,7 +954,7 @@ public class CatalogOpExecutor {
     //       see other ways to get a new write id (which is needed to update
     //       transactional tables). Hive seems to use internal API for this.
     //       See IMPALA-8865 about plans to improve this.
-    MetastoreShim.TblTransaction tblTxn = null;
+    TblTransaction tblTxn = null;
     try(MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       try {
         if (AcidUtils.isTransactionalTable(msTbl.getParameters())) {
@@ -974,7 +979,7 @@ public class CatalogOpExecutor {
       org.apache.hadoop.hive.metastore.api.Table msTbl,
       TAlterTableUpdateStatsParams params,
       Reference<Long> numUpdatedPartitions, Reference<Long> numUpdatedColumns,
-      MetaStoreClient msClient, MetastoreShim.TblTransaction tblTxn)
+      MetaStoreClient msClient, TblTransaction tblTxn)
       throws ImpalaException {
     // Update column stats.
     numUpdatedColumns.setRef(0L);
@@ -1624,7 +1629,7 @@ public class CatalogOpExecutor {
       HeartbeatContext ctx = new HeartbeatContext(
           String.format("Drop table/view %s.%s", tableName.getDb(), tableName.getTbl()),
           System.nanoTime());
-      lockId = catalog_.lockTable(tableName.getDb(), tableName.getTbl(), ctx);
+      lockId = catalog_.lockTableStandalone(tableName.getDb(), tableName.getTbl(), ctx);
     }
 
     try {
@@ -1803,15 +1808,19 @@ public class CatalogOpExecutor {
       long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
       catalog_.getLock().writeLock().unlock();
       try {
-        HdfsTable hdfsTable = (HdfsTable)table;
-        Collection<? extends FeFsPartition> parts =
-            FeCatalogUtils.loadAllPartitions(hdfsTable);
-        for (FeFsPartition part: parts) {
-          FileSystemUtil.deleteAllVisibleFiles(new Path(part.getLocation()));
+        if (MetastoreShim.getMajorVersion() > 2 && AcidUtils.isTransactionalTable(
+            table.getMetaStoreTable().getParameters())) {
+          truncateTransactionalTable(params, table);
+        } else {
+          HdfsTable hdfsTable = (HdfsTable)table;
+          Collection<? extends FeFsPartition> parts =
+              FeCatalogUtils.loadAllPartitions(hdfsTable);
+          for (FeFsPartition part: parts) {
+            FileSystemUtil.deleteAllVisibleFiles(new Path(part.getLocation()));
+          }
+          dropColumnStats(table);
+          dropTableStats(table);
         }
-
-        dropColumnStats(table);
-        dropTableStats(table);
       } catch (Exception e) {
         String fqName = tblName.db_name + "." + tblName.table_name;
         throw new CatalogException(String.format("Failed to truncate table: %s.\n" +
@@ -1827,6 +1836,84 @@ public class CatalogOpExecutor {
     }
   }
 
+  /**
+   * Truncates a transactional table. It creates new empty base directories in all
+   * partitions of the table. That way queries started earlier can still read a
+   * valid snapshot version of the data. HMS's cleaner should remove obsolete
+   * directories later.
+   * After that empty directory creation it removes stats-related parameters of
+   * the table and its partitions.
+   */
+  private void truncateTransactionalTable(TTruncateParams params, Table table)
+      throws ImpalaException {
+    TableName tblName = TableName.fromThrift(params.getTable_name());
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      IMetaStoreClient hmsClient = msClient.getHiveClient();
+      HeartbeatContext ctx = new HeartbeatContext(
+          String.format("Truncate table %s.%s", tblName.getDb(), tblName.getTbl()),
+          System.nanoTime());
+      try (Transaction txn = catalog_.openTransaction(hmsClient, ctx)) {
+        Preconditions.checkState(txn.getId() > 0);
+        catalog_.lockTableInTransaction(tblName.getDb(), tblName.getTbl(), txn,
+            DataOperationType.DELETE, ctx);
+        TblTransaction tblTxn = MetastoreShim.createTblTransaction(hmsClient,
+            table.getMetaStoreTable(), txn.getId());
+        HdfsTable hdfsTable = (HdfsTable)table;
+        Collection<? extends FeFsPartition> parts =
+            FeCatalogUtils.loadAllPartitions(hdfsTable);
+        createEmptyBaseDirectories(parts, tblTxn.writeId);
+        // Currently Impala cannot update the statistics properly. So instead of
+        // writing correct stats, let's just remove COLUMN_STATS_ACCURATE parameter from
+        // each partition.
+        // TODO(IMPALA-8883): properly update statistics
+        List<org.apache.hadoop.hive.metastore.api.Partition> hmsPartitions =
+            Lists.newArrayListWithCapacity(parts.size());
+        if (table.getNumClusteringCols() > 0) {
+          for (FeFsPartition part: parts) {
+            org.apache.hadoop.hive.metastore.api.Partition hmsPart =
+                ((HdfsPartition)part).toHmsPartition();
+            Preconditions.checkNotNull(hmsPart);
+            if (hmsPart.getParameters() != null) {
+              hmsPart.getParameters().remove(StatsSetupConst.COLUMN_STATS_ACCURATE);
+              hmsPartitions.add(hmsPart);
+            }
+          }
+        }
+        // For partitioned tables we need to alter all the partitions in HMS.
+        if (!hmsPartitions.isEmpty()) {
+          unsetPartitionsColStats(table.getMetaStoreTable(), hmsPartitions, tblTxn);
+        }
+        // Remove COLUMN_STATS_ACCURATE property from the table.
+        unsetTableColStats(table.getMetaStoreTable(), tblTxn);
+        txn.commit();
+      }
+    } catch (Exception e) {
+      throw new ImpalaRuntimeException(
+          String.format(HMS_RPC_ERROR_FORMAT_STR, "truncateTable"), e);
+    }
+  }
+
+  /**
+   * Creates new empty base directories for an ACID table. The directories won't be
+   * really empty, they will contain the hidden "_empty" file. It's needed because
+   * FileSystemUtil.listFiles() doesn't see empty directories. See IMPALA-8739.
+   * @param partitions the partitions in which we create new directories.
+   * @param writeId the write id of the new base directory.
+   * @throws IOException
+   */
+  private void createEmptyBaseDirectories(
+      Collection<? extends FeFsPartition> partitions, long writeId) throws IOException {
+    for (FeFsPartition part: partitions) {
+      Path partPath = new Path(part.getLocation());
+      FileSystem fs = FileSystemUtil.getFileSystemForPath(partPath);
+      String baseDirStr =
+          part.getLocation() + Path.SEPARATOR + "base_" + String.valueOf(writeId);
+      fs.mkdirs(new Path(baseDirStr));
+      String emptyFile = baseDirStr + Path.SEPARATOR + "_empty";
+      fs.create(new Path(emptyFile));
+    }
+  }
+
   private void dropFunction(TDropFunctionParams params, TDdlExecResponse resp)
       throws ImpalaException {
     FunctionName fName = FunctionName.fromThrift(params.fn_name);
@@ -3375,7 +3462,7 @@ public class CatalogOpExecutor {
    * call.
    */
   private void applyAlterTable(org.apache.hadoop.hive.metastore.api.Table msTbl,
-      boolean overwriteLastDdlTime, MetastoreShim.TblTransaction tblTxn)
+      boolean overwriteLastDdlTime, TblTransaction tblTxn)
       throws ImpalaRuntimeException {
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       if (overwriteLastDdlTime) {
@@ -3550,8 +3637,7 @@ public class CatalogOpExecutor {
    * timeouts.
    */
   private void bulkAlterPartitions(Table tbl, List<HdfsPartition> modifiedParts,
-      MetastoreShim.TblTransaction tblTxn)
-      throws ImpalaException {
+      TblTransaction tblTxn) throws ImpalaException {
     List<org.apache.hadoop.hive.metastore.api.Partition> hmsPartitions =
         Lists.newArrayList();
     for (HdfsPartition p: modifiedParts) {
@@ -3808,7 +3894,7 @@ public class CatalogOpExecutor {
         = table.getMetrics().getTimer(HdfsTable.CATALOG_UPDATE_DURATION_METRIC).time();
 
     long transactionId = -1;
-    MetastoreShim.TblTransaction tblTxn = null;
+    TblTransaction tblTxn = null;
     if (update.isSetTransaction_id()) {
       transactionId = update.getTransaction_id();
       Preconditions.checkState(transactionId > 0);
@@ -4370,7 +4456,7 @@ public class CatalogOpExecutor {
    * Update table properties to remove the COLUMN_STATS_ACCURATE entry if it exists.
    */
   private void unsetTableColStats(org.apache.hadoop.hive.metastore.api.Table msTable,
-      MetastoreShim.TblTransaction tblTxn) throws ImpalaRuntimeException{
+      TblTransaction tblTxn) throws ImpalaRuntimeException{
     Map<String, String> params = msTable.getParameters();
     if (params != null && params.containsKey(StatsSetupConst.COLUMN_STATS_ACCURATE)) {
       params.remove(StatsSetupConst.COLUMN_STATS_ACCURATE);
@@ -4385,7 +4471,7 @@ public class CatalogOpExecutor {
    */
   private void unsetPartitionsColStats(org.apache.hadoop.hive.metastore.api.Table msTable,
       List<org.apache.hadoop.hive.metastore.api.Partition> hmsPartitionsStatsUnset,
-      MetastoreShim.TblTransaction tblTxn) throws ImpalaRuntimeException{
+      TblTransaction tblTxn) throws ImpalaRuntimeException{
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       try {
         if (tblTxn != null) {
diff --git a/fe/src/main/java/org/apache/impala/util/AcidUtils.java b/fe/src/main/java/org/apache/impala/util/AcidUtils.java
index a27888d..29db92f 100644
--- a/fe/src/main/java/org/apache/impala/util/AcidUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/AcidUtils.java
@@ -18,7 +18,6 @@ package org.apache.impala.util;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-
 import com.google.errorprone.annotations.Immutable;
 
 import org.apache.hadoop.fs.FileStatus;
@@ -53,6 +52,16 @@ public class AcidUtils {
   public static final String TABLE_IS_TRANSACTIONAL = "transactional";
   public static final String TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties";
 
+  /**
+   * Transaction parameters needed for single table operations.
+   */
+  public static class TblTransaction {
+    public long txnId;
+    public boolean ownsTxn;
+    public long writeId;
+    public String validWriteIds;
+  }
+
   // Regex pattern for files in base directories. The pattern matches strings like
   // "base_0000005/abc.txt",
   // "base_0000005/0000/abc.txt",
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
index 4c042fe..ddfd192 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
@@ -581,8 +581,7 @@ public class AnalyzerTest extends FrontendTestBase {
     AnalysisError(
         "truncate table functional_orc_def.full_transactional_table",
         errorMsg);
-    AnalysisError("truncate table functional.insert_only_transactional_table",
-        String.format(insertOnlyErrorMsg, "TRUNCATE TABLE"));
+    AnalyzesOk("truncate table functional.insert_only_transactional_table");
 
     AnalysisError(
         "alter table functional_orc_def.full_transactional_table " +
diff --git a/testdata/workloads/functional-query/queries/QueryTest/acid-truncate.test b/testdata/workloads/functional-query/queries/QueryTest/acid-truncate.test
new file mode 100644
index 0000000..0e14f43
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/acid-truncate.test
@@ -0,0 +1,97 @@
+====
+---- QUERY
+create table tt (x int) tblproperties (
+  'transactional'='true',
+  'transactional_properties'='insert_only');
+truncate table tt;
+select * from tt;
+---- RESULTS
+====
+---- QUERY
+insert into tt values (1);
+insert into tt values (2);
+insert into tt values (3);
+====
+---- QUERY
+select * from tt;
+---- RESULTS
+1
+2
+3
+====
+---- HIVE_QUERY
+use $DATABASE;
+analyze table tt compute statistics for columns;
+====
+---- QUERY
+invalidate metadata tt;
+show create table tt;
+---- RESULTS
+row_regex: .*COLUMN_STATS_ACCURATE.*
+====
+---- QUERY
+truncate table tt;
+select * from tt;
+---- RESULTS
+====
+---- QUERY
+show files in tt;
+---- RESULTS
+====
+---- QUERY
+show create table tt;
+---- RESULTS
+row_regex: (?!.*COLUMN_STATS_ACCURATE)
+====
+---- QUERY
+truncate table tt;
+insert into tt values (4);
+select * from tt;
+---- RESULTS
+4
+====
+---- QUERY
+# Create partitioned ACID table and use dynamic partitioning during insert.
+create table pt (i int)
+partitioned by (sp int, dp int) tblproperties (
+  'transactional'='true',
+  'transactional_properties'='insert_only');
+insert into table pt partition(sp=1, dp) select 10, 1 union select 20, 2;
+insert into table pt partition(sp=3, dp) select 30, 3;
+====
+---- QUERY
+select sp, dp, i from pt order by sp, dp, i;
+---- RESULTS
+1,1,10
+1,2,20
+3,3,30
+---- TYPES
+INT,INT,INT
+====
+---- QUERY
+truncate table pt;
+select * from pt;
+---- RESULTS
+====
+---- QUERY
+show files in pt;
+---- RESULTS
+====
+---- QUERY
+insert into table pt partition(sp=1, dp) select 11, 1 union select 21, 2;
+insert into table pt partition(sp=4, dp) select 40, 4;
+select sp, dp, i from pt order by sp, dp, i;
+---- RESULTS
+1,1,11
+1,2,21
+4,4,40
+---- TYPES
+INT,INT,INT
+====
+---- HIVE_QUERY
+use $DATABASE;
+analyze table pt compute statistics for columns;
+====
+---- QUERY
+truncate table pt;
+====
diff --git a/tests/metadata/test_hms_integration.py b/tests/metadata/test_hms_integration.py
index 59de250..90ee97d 100644
--- a/tests/metadata/test_hms_integration.py
+++ b/tests/metadata/test_hms_integration.py
@@ -751,6 +751,26 @@ class TestHmsIntegration(ImpalaTestSuite):
         "show tables in %s" % unique_database)
     assert "acid_insert" not in show_tables_result_after_drop
 
+  @SkipIfHive2.acid
+  def test_truncate_acid_table(self, vector, unique_database):
+    """
+    Tests that a transactional table truncated by Impala shows no rows when
+    queried by Hive.
+    """
+    table_name = "%s.acid_truncate" % unique_database
+    self.client.execute(
+      "create table %s (i int) "
+      "TBLPROPERTIES('transactional'='true', "
+      "'transactional_properties'='insert_only')" % table_name)
+    self.client.execute("insert into %s values (1), (2)" % table_name)
+    query_result = self.run_stmt_in_hive("select * from %s" % table_name)
+    assert "1" in query_result
+    assert "2" in query_result
+    self.client.execute("truncate table %s" % table_name)
+    query_result_after_truncate = self.run_stmt_in_hive("select count(*) from %s" %
+                                                        table_name)
+    assert "0" == query_result_after_truncate.split('\n')[1]
+
   @pytest.mark.execute_serially
   def test_change_table_name(self, vector):
     """
diff --git a/tests/query_test/test_acid.py b/tests/query_test/test_acid.py
index db42693..2468911 100644
--- a/tests/query_test/test_acid.py
+++ b/tests/query_test/test_acid.py
@@ -66,6 +66,17 @@ class TestAcid(ImpalaTestSuite):
   @SkipIfADLS.hive
   @SkipIfIsilon.hive
   @SkipIfLocal.hive
+  def test_acid_truncate(self, vector, unique_database):
+    self.run_test_case('QueryTest/acid-truncate', vector, use_db=unique_database)
+    assert "0" == self.run_stmt_in_hive("select count(*) from {0}.{1}".format(
+        unique_database, "pt")).split("\n")[1]
+
+  @SkipIfHive2.acid
+  @SkipIfS3.hive
+  @SkipIfABFS.hive
+  @SkipIfADLS.hive
+  @SkipIfIsilon.hive
+  @SkipIfLocal.hive
   def test_acid_partitioned(self, vector, unique_database):
     self.run_test_case('QueryTest/acid-partitioned', vector, use_db=unique_database)
 


[impala] 03/04: IMPALA-8890: Advance read page in UnpinStream

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit eea617be8f3ffa72acdb4fb857803adb5ec1579a
Author: Sahil Takiar <ta...@gmail.com>
AuthorDate: Mon Aug 26 11:55:05 2019 -0700

    IMPALA-8890: Advance read page in UnpinStream
    
    Fixes a DCHECK in BufferedTupleStream::UnpinStream where
    ExpectedPinCount would fail if the stream still referenced a read page
    that had be completely read. The patch changes UnpinStream so that if
    the current read page has been completely read, the read page is
    advanced using NextReadPage().
    
    Testing:
    * Ran core tests
    * Added new tests to buffered-tuple-stream-test
    
    Change-Id: Id61a82559fd2baab237cc44f60a69cba2b30c16e
    Reviewed-on: http://gerrit.cloudera.org:8080/14144
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/analytic-eval-node.cc            |  3 +-
 be/src/exec/grouping-aggregator-partition.cc |  5 +-
 be/src/exec/grouping-aggregator.cc           | 16 +++---
 be/src/exec/grouping-aggregator.h            |  2 +-
 be/src/exec/partitioned-hash-join-builder.cc |  5 +-
 be/src/exec/partitioned-hash-join-node.cc    | 19 ++++---
 be/src/runtime/buffered-tuple-stream-test.cc | 78 +++++++++++++++++++++++++---
 be/src/runtime/buffered-tuple-stream.cc      | 10 +++-
 be/src/runtime/buffered-tuple-stream.h       |  2 +-
 be/src/runtime/spillable-row-batch-queue.cc  |  3 +-
 10 files changed, 114 insertions(+), 29 deletions(-)

diff --git a/be/src/exec/analytic-eval-node.cc b/be/src/exec/analytic-eval-node.cc
index 73b5346..b62f663 100644
--- a/be/src/exec/analytic-eval-node.cc
+++ b/be/src/exec/analytic-eval-node.cc
@@ -344,7 +344,8 @@ inline Status AnalyticEvalNode::AddRow(int64_t stream_idx, TupleRow* row) {
     // TODO: Consider re-pinning later if the output stream is fully consumed.
     RETURN_IF_ERROR(status);
     RETURN_IF_ERROR(state_->StartSpilling(mem_tracker()));
-    input_stream_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+    RETURN_IF_ERROR(
+        input_stream_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
     VLOG_FILE << id() << " Unpin input stream while adding row idx=" << stream_idx;
     if (!input_stream_->AddRow(row, &status)) {
       // Rows should be added in unpinned mode unless an error occurs.
diff --git a/be/src/exec/grouping-aggregator-partition.cc b/be/src/exec/grouping-aggregator-partition.cc
index a968ef0..03f54d8 100644
--- a/be/src/exec/grouping-aggregator-partition.cc
+++ b/be/src/exec/grouping-aggregator-partition.cc
@@ -185,9 +185,10 @@ Status GroupingAggregator::Partition::Spill(bool more_aggregate_rows) {
   DCHECK(aggregated_row_stream->has_write_iterator());
   DCHECK(!unaggregated_row_stream->has_write_iterator());
   if (more_aggregate_rows) {
-    aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+    RETURN_IF_ERROR(aggregated_row_stream->UnpinStream(
+        BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
   } else {
-    aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL);
+    RETURN_IF_ERROR(aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL));
     bool got_buffer;
     RETURN_IF_ERROR(unaggregated_row_stream->PrepareForWrite(&got_buffer));
     DCHECK(got_buffer) << "Accounted in min reservation"
diff --git a/be/src/exec/grouping-aggregator.cc b/be/src/exec/grouping-aggregator.cc
index 358919a..5238c43 100644
--- a/be/src/exec/grouping-aggregator.cc
+++ b/be/src/exec/grouping-aggregator.cc
@@ -755,7 +755,7 @@ Status GroupingAggregator::BuildSpilledPartition(Partition** built_partition) {
   hash_partitions_.clear();
 
   if (dst_partition->is_spilled()) {
-    PushSpilledPartition(dst_partition);
+    RETURN_IF_ERROR(PushSpilledPartition(dst_partition));
     *built_partition = nullptr;
     // Spilled the partition - we should not be using any reservation except from
     // 'serialize_stream_'.
@@ -790,7 +790,8 @@ Status GroupingAggregator::RepartitionSpilledPartition() {
     if (!hash_partition->is_spilled()) continue;
     // The aggregated rows have been repartitioned. Free up at least a buffer's worth of
     // reservation and use it to pin the unaggregated write buffer.
-    hash_partition->aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL);
+    RETURN_IF_ERROR(hash_partition->aggregated_row_stream->UnpinStream(
+        BufferedTupleStream::UNPIN_ALL));
     bool got_buffer;
     RETURN_IF_ERROR(
         hash_partition->unaggregated_row_stream->PrepareForWrite(&got_buffer));
@@ -907,7 +908,7 @@ Status GroupingAggregator::MoveHashPartitions(int64_t num_input_rows) {
     if (total_rows == 0) {
       partition->Close(false);
     } else if (partition->is_spilled()) {
-      PushSpilledPartition(partition);
+      RETURN_IF_ERROR(PushSpilledPartition(partition));
     } else {
       aggregated_partitions_.push_back(partition);
     }
@@ -917,15 +918,18 @@ Status GroupingAggregator::MoveHashPartitions(int64_t num_input_rows) {
   return Status::OK();
 }
 
-void GroupingAggregator::PushSpilledPartition(Partition* partition) {
+Status GroupingAggregator::PushSpilledPartition(Partition* partition) {
   DCHECK(partition->is_spilled());
   DCHECK(partition->hash_tbl == nullptr);
   // Ensure all pages in the spilled partition's streams are unpinned by invalidating
   // the streams' read and write iterators. We may need all the memory to process the
   // next spilled partitions.
-  partition->aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL);
-  partition->unaggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL);
+  RETURN_IF_ERROR(
+      partition->aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL));
+  RETURN_IF_ERROR(
+      partition->unaggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL));
   spilled_partitions_.push_front(partition);
+  return Status::OK();
 }
 
 void GroupingAggregator::ClosePartitions() {
diff --git a/be/src/exec/grouping-aggregator.h b/be/src/exec/grouping-aggregator.h
index 258ed6e..dea7e3f 100644
--- a/be/src/exec/grouping-aggregator.h
+++ b/be/src/exec/grouping-aggregator.h
@@ -586,7 +586,7 @@ class GroupingAggregator : public Aggregator {
   /// 'spilled_partitions_' uses LIFO so more finely partitioned partitions are processed
   /// first). This allows us to delete pages earlier and bottom out the recursion
   /// earlier and also improves time locality of access to spilled data on disk.
-  void PushSpilledPartition(Partition* partition);
+  Status PushSpilledPartition(Partition* partition) WARN_UNUSED_RESULT;
 
   /// Calls Close() on 'output_partition_' and every Partition in
   /// 'aggregated_partitions_', 'spilled_partitions_', and 'hash_partitions_' and then
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index 83fb432..175d7d8 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -361,7 +361,8 @@ Status PhjBuilder::BuildHashTablesAndPrepareProbeStreams() {
       partition->Close(NULL);
     } else if (partition->is_spilled()) {
       // We don't need any build-side data for spilled partitions in memory.
-      partition->build_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL);
+      RETURN_IF_ERROR(
+          partition->build_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL));
     }
   }
 
@@ -628,7 +629,7 @@ Status PhjBuilder::Partition::Spill(BufferedTupleStream::UnpinMode mode) {
     hash_tbl_->Close();
     hash_tbl_.reset();
   }
-  build_rows_->UnpinStream(mode);
+  RETURN_IF_ERROR(build_rows_->UnpinStream(mode));
   if (!is_spilled_) {
     COUNTER_ADD(parent_->num_spilled_partitions_, 1);
     if (parent_->num_spilled_partitions_->value() == 1) {
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index 99931b4..42385d0 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -421,7 +421,8 @@ Status PartitionedHashJoinNode::PrepareSpilledPartitionForProbe(
     // Spill to free memory from hash tables and pinned streams for use in new partitions.
     RETURN_IF_ERROR(build_partition->Spill(BufferedTupleStream::UNPIN_ALL));
     // Temporarily free up the probe buffer to use when repartitioning.
-    input_partition_->probe_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL);
+    RETURN_IF_ERROR(
+        input_partition_->probe_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL));
     DCHECK_EQ(build_partition->build_rows()->BytesPinned(false), 0) << NodeDebugString();
     DCHECK_EQ(input_partition_->probe_rows()->BytesPinned(false), 0) << NodeDebugString();
     int64_t num_input_rows = build_partition->build_rows()->num_rows();
@@ -992,9 +993,10 @@ Status PartitionedHashJoinNode::PrepareForProbe() {
   if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN
       && (have_spilled_hash_partitions
              || builder_->null_aware_partition()->is_spilled())) {
-    null_probe_rows_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
-    null_aware_probe_partition_->probe_rows()->UnpinStream(
-        BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+    RETURN_IF_ERROR(
+        null_probe_rows_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
+    RETURN_IF_ERROR(null_aware_probe_partition_->probe_rows()->UnpinStream(
+        BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
   }
 
   // Initialize the hash_tbl_ caching array.
@@ -1034,7 +1036,8 @@ bool PartitionedHashJoinNode::AppendProbeRowSlow(
   if (!status->ok()) return false; // Check if AddRow() set status.
   *status = runtime_state_->StartSpilling(mem_tracker());
   if (!status->ok()) return false;
-  stream->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+  *status = stream->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+  if (!status->ok()) return false;
   return stream->AddRow(row, status);
 }
 
@@ -1119,9 +1122,11 @@ Status PartitionedHashJoinNode::CleanUpHashPartitions(
       // can recurse the algorithm and create new hash partitions from spilled partitions.
       // TODO: we shouldn't need to unpin the build stream if we stop spilling
       // while probing.
-      build_partition->build_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL);
+      RETURN_IF_ERROR(
+          build_partition->build_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL));
       DCHECK_EQ(build_partition->build_rows()->BytesPinned(false), 0);
-      probe_partition->probe_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL);
+      RETURN_IF_ERROR(
+          probe_partition->probe_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL));
 
       if (probe_partition->probe_rows()->num_rows() != 0
           || NeedToProcessUnmatchedBuildRows()) {
diff --git a/be/src/runtime/buffered-tuple-stream-test.cc b/be/src/runtime/buffered-tuple-stream-test.cc
index 85cceb0..6b097dd 100644
--- a/be/src/runtime/buffered-tuple-stream-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-test.cc
@@ -326,7 +326,7 @@ class SimpleTupleStreamTest : public testing::Test {
     ASSERT_TRUE(got_write_reservation);
 
     if (unpin_stream) {
-      stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+      ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
     }
     // Add rows to the stream
     int offset = 0;
@@ -369,7 +369,7 @@ class SimpleTupleStreamTest : public testing::Test {
     ASSERT_OK(stream.PrepareForReadWrite(true, &got_reservation));
     ASSERT_TRUE(got_reservation);
     if (unpin_stream) {
-      stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+      ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
     }
 
     vector<int> results;
@@ -640,7 +640,7 @@ void SimpleTupleStreamTest::TestUnpinPin(bool varlen_data, bool read_write) {
     // Make sure we can switch between pinned and unpinned states while writing.
     if (num_batches % 10 == 0) {
       bool pinned;
-      stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+      ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
       ASSERT_OK(stream.PinStream(&pinned));
       DCHECK(pinned);
     }
@@ -658,7 +658,7 @@ void SimpleTupleStreamTest::TestUnpinPin(bool varlen_data, bool read_write) {
     ++num_batches;
   }
 
-  stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+  ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
 
   bool pinned = false;
   ASSERT_OK(stream.PinStream(&pinned));
@@ -1116,7 +1116,7 @@ TEST_F(SimpleTupleStreamTest, BigRowMemoryUse) {
   bool success = stream.AddRow(batch->GetRow(0), &status);
   ASSERT_FALSE(success);
   ASSERT_OK(status);
-  stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+  ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
   success = stream.AddRow(batch->GetRow(0), &status);
   ASSERT_TRUE(success);
   // Read all the rows back and verify.
@@ -1187,7 +1187,7 @@ TEST_F(SimpleTupleStreamTest, BigStringReadWrite) {
   bool success = stream.AddRow(write_row, &status);
   ASSERT_FALSE(success);
   ASSERT_OK(status);
-  stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+  ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
   success = stream.AddRow(write_row, &status);
   ASSERT_TRUE(success);
 
@@ -1210,6 +1210,72 @@ TEST_F(SimpleTupleStreamTest, BigStringReadWrite) {
   stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
 }
 
+// Test that UnpinStream advances the read page if all rows from the read page are
+// attached to a returned RowBatch.
+TEST_F(SimpleTupleStreamTest, UnpinReadPage) {
+  int num_rows = 1024;
+  int buffer_size = 4 * 1024;
+  Init(100 * buffer_size);
+
+  bool eos;
+  bool got_reservation;
+  Status status;
+  RowBatch* write_batch = CreateIntBatch(0, num_rows, false);
+
+  {
+    // Test unpinning a stream when the read page has been attached to the output batch.
+    BufferedTupleStream stream(
+        runtime_state_, int_desc_, &client_, buffer_size, buffer_size);
+    ASSERT_OK(stream.Init("SimpleTupleStreamTest::UnpinReadPage", true));
+    ASSERT_OK(stream.PrepareForReadWrite(true, &got_reservation));
+    ASSERT_TRUE(got_reservation);
+
+    // Add rows to stream.
+    for (int i = 0; i < write_batch->num_rows(); ++i) {
+      EXPECT_TRUE(stream.AddRow(write_batch->GetRow(i), &status));
+      ASSERT_OK(status);
+    }
+
+    // Read until the read page is attached to the output.
+    RowBatch read_batch(int_desc_, num_rows, &tracker_);
+    ASSERT_OK(stream.GetNext(&read_batch, &eos));
+    // If GetNext did hit the capacity of the RowBatch, then the read page should have
+    // been attached to read_batch.
+    ASSERT_TRUE(read_batch.num_rows() < num_rows);
+    ASSERT_TRUE(!eos);
+    read_batch.Reset();
+
+    // Unpin the stream.
+    ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
+    stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+  }
+
+  {
+    // Test unpinning an empty stream (all rows have been attached to RowBatches).
+    BufferedTupleStream stream(
+        runtime_state_, int_desc_, &client_, buffer_size, buffer_size);
+    ASSERT_OK(stream.Init("SimpleTupleStreamTest::UnpinReadPage", true));
+    ASSERT_OK(stream.PrepareForReadWrite(true, &got_reservation));
+    ASSERT_TRUE(got_reservation);
+
+    for (int i = 0; i < write_batch->num_rows(); ++i) {
+      EXPECT_TRUE(stream.AddRow(write_batch->GetRow(i), &status));
+      ASSERT_OK(status);
+    }
+
+    // Read and validate all contents of the stream.
+    vector<int> results;
+    ReadValues(&stream, int_desc_, &results);
+    VerifyResults<int>(*int_desc_, results, num_rows, false);
+
+    // Unpin and close the stream.
+    ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
+    stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+  }
+
+  write_batch->Reset();
+}
+
 // Basic API test. No data should be going to disk.
 TEST_F(SimpleNullStreamTest, Basic) {
   Init(BUFFER_POOL_LIMIT);
diff --git a/be/src/runtime/buffered-tuple-stream.cc b/be/src/runtime/buffered-tuple-stream.cc
index 87c07eb..a37c443 100644
--- a/be/src/runtime/buffered-tuple-stream.cc
+++ b/be/src/runtime/buffered-tuple-stream.cc
@@ -210,7 +210,7 @@ string BufferedTupleStream::Page::DebugString() const {
 }
 
 Status BufferedTupleStream::Init(const string& caller_label, bool pinned) {
-  if (!pinned) UnpinStream(UNPIN_ALL_EXCEPT_CURRENT);
+  if (!pinned) RETURN_IF_ERROR(UnpinStream(UNPIN_ALL_EXCEPT_CURRENT));
   caller_label_ = caller_label;
   return Status::OK();
 }
@@ -659,7 +659,7 @@ Status BufferedTupleStream::PinStream(bool* pinned) {
   return Status::OK();
 }
 
-void BufferedTupleStream::UnpinStream(UnpinMode mode) {
+Status BufferedTupleStream::UnpinStream(UnpinMode mode) {
   CHECK_CONSISTENCY_FULL();
   DCHECK(!closed_);
   if (mode == UNPIN_ALL) {
@@ -670,6 +670,11 @@ void BufferedTupleStream::UnpinStream(UnpinMode mode) {
 
   if (pinned_) {
     CHECK_CONSISTENCY_FULL();
+    if (&*read_page_ != write_page_ && read_page_ != pages_.end()
+        && read_page_rows_returned_ == read_page_->num_rows) {
+      RETURN_IF_ERROR(NextReadPage());
+    }
+
     // If the stream was pinned, there may be some remaining pinned pages that should
     // be unpinned at this point.
     for (Page& page : pages_) UnpinPageIfNeeded(&page, false);
@@ -684,6 +689,7 @@ void BufferedTupleStream::UnpinStream(UnpinMode mode) {
     pinned_ = false;
   }
   CHECK_CONSISTENCY_FULL();
+  return Status::OK();
 }
 
 Status BufferedTupleStream::GetNext(RowBatch* batch, bool* eos) {
diff --git a/be/src/runtime/buffered-tuple-stream.h b/be/src/runtime/buffered-tuple-stream.h
index b0cb8db..e98a9fe 100644
--- a/be/src/runtime/buffered-tuple-stream.h
+++ b/be/src/runtime/buffered-tuple-stream.h
@@ -316,7 +316,7 @@ class BufferedTupleStream {
   };
 
   /// Unpins stream with the given 'mode' as described above.
-  void UnpinStream(UnpinMode mode);
+  Status UnpinStream(UnpinMode mode) WARN_UNUSED_RESULT;
 
   /// Get the next batch of output rows, which are backed by the stream's memory.
   ///
diff --git a/be/src/runtime/spillable-row-batch-queue.cc b/be/src/runtime/spillable-row-batch-queue.cc
index a9be9f7..da21660 100644
--- a/be/src/runtime/spillable-row-batch-queue.cc
+++ b/be/src/runtime/spillable-row-batch-queue.cc
@@ -84,7 +84,8 @@ Status SpillableRowBatchQueue::AddBatch(RowBatch* batch) {
       DCHECK_EQ(batch_queue_->bytes_unpinned(), 0);
 
       // Unpin the stream and then add the row.
-      batch_queue_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+      RETURN_IF_ERROR(
+          batch_queue_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
 
       // Append "Spilled" to the "ExecOption" info string in the runtime profile.
       profile_->AppendExecOption("Spilled");


[impala] 01/04: IMPALA-8896: fix alterPartitionsWithTransaction() that emitted too many alter tables.

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit db007486a3d11ca935c3a6722f9ae388fb097853
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Tue Aug 27 12:05:21 2019 +0200

    IMPALA-8896: fix alterPartitionsWithTransaction() that emitted too many alter tables.
    
    Moved 'alter_partitions()' RPC outside of the for-loop
    in alterPartitionsWithTransaction(). Having it inside the for-loop made
    many unnecessary RPC calls towards HMS.
    
    Change-Id: I47d9d732caf3093206efb1f283fec2eee3347671
    Reviewed-on: http://gerrit.cloudera.org:8080/14148
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../java/org/apache/impala/compat/MetastoreShim.java              | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index e96e7b3..d5a3da6 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -237,11 +237,11 @@ public class MetastoreShim {
       ) throws InvalidOperationException, MetaException, TException {
     for (Partition part : partitions) {
       part.setWriteId(tblTxn.writeId);
-      // Correct validWriteIdList is needed
-      // to commit the alter partitions operation in hms side.
-      client.alter_partitions(dbName, tblName, partitions, null,
-           tblTxn.validWriteIds, tblTxn.writeId);
     }
+    // Correct validWriteIdList is needed
+    // to commit the alter partitions operation in hms side.
+    client.alter_partitions(dbName, tblName, partitions, null,
+        tblTxn.validWriteIds, tblTxn.writeId);
   }
 
   /**


[impala] 04/04: Fix THttpServer to not call the cookie function with an empty cookie

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 3209055ba2b5a05bedf7c9ecf0edd0e06c2525c9
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
AuthorDate: Tue Aug 27 09:54:19 2019 -0700

    Fix THttpServer to not call the cookie function with an empty cookie
    
    This patch checks if the value passed in the 'Cookie' header to the
    http hs2 server is blank, and if so it ignores it.
    
    The reason to do this is so that a client sending an empty cookie
    header isn't counted as a failed cookie attempt, which is incorrect.
    
    Change-Id: I04e96fe97baae474a82fd30f2cd55ccce80570b4
    Reviewed-on: http://gerrit.cloudera.org:8080/14149
    Reviewed-by: Tim Armstrong <ta...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/transport/THttpServer.cpp | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)

diff --git a/be/src/transport/THttpServer.cpp b/be/src/transport/THttpServer.cpp
index e1de7bf..120efe2 100644
--- a/be/src/transport/THttpServer.cpp
+++ b/be/src/transport/THttpServer.cpp
@@ -176,11 +176,15 @@ void THttpServer::headersDone() {
   // Try authenticating with cookies first.
   if (use_cookies_ && !cookie_value_.empty()) {
     StripWhiteSpace(&cookie_value_);
-    if (callbacks_.cookie_auth_fn(cookie_value_)) {
-      authorized = true;
-      if (metrics_enabled_) http_metrics_->total_cookie_auth_success_->Increment(1);
-    } else if (metrics_enabled_) {
-      http_metrics_->total_cookie_auth_failure_->Increment(1);
+    // If a 'Cookie' header was provided with an empty value, we ignore it rather than
+    // counting it as a failed cookie attempt.
+    if (!cookie_value_.empty()) {
+      if (callbacks_.cookie_auth_fn(cookie_value_)) {
+        authorized = true;
+        if (metrics_enabled_) http_metrics_->total_cookie_auth_success_->Increment(1);
+      } else if (metrics_enabled_) {
+        http_metrics_->total_cookie_auth_failure_->Increment(1);
+      }
     }
   }