You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vi...@apache.org on 2018/04/23 17:20:02 UTC

hive git commit: HIVE-18743 : CREATE TABLE on S3 data can be extremely slow. DO_NOT_UPDATE_STATS workaround is buggy (Alexander Kolbasov, reviwed by Zoltan Haindrich via Vihang Karajgaonkar)

Repository: hive
Updated Branches:
  refs/heads/branch-2 780db578b -> b9b0b0aa2


HIVE-18743 : CREATE TABLE on S3 data can be extremely slow. DO_NOT_UPDATE_STATS workaround is buggy (Alexander Kolbasov, reviwed by Zoltan Haindrich via Vihang Karajgaonkar)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b9b0b0aa
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b9b0b0aa
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b9b0b0aa

Branch: refs/heads/branch-2
Commit: b9b0b0aa2f53d22d92f521020036a8355ecd3f83
Parents: 780db57
Author: Alexander Kolbasov <ak...@cloudera.com>
Authored: Mon Apr 23 10:13:49 2018 -0700
Committer: Vihang Karajgaonkar <vi...@cloudera.com>
Committed: Mon Apr 23 10:13:49 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hive/metastore/HiveAlterHandler.java |   2 +-
 .../hadoop/hive/metastore/HiveMetaStore.java    |   4 +-
 .../hadoop/hive/metastore/MetaStoreUtils.java   |  96 ++++-----
 .../hive/metastore/TestMetaStoreUtils.java      | 212 +++++++++++++++++++
 .../ql/metadata/SessionHiveMetaStoreClient.java |   3 +-
 5 files changed, 259 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/b9b0b0aa/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
index 83c68a2..3d7df22 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
@@ -289,7 +289,7 @@ public class HiveAlterHandler implements AlterHandler {
             !isPartitionedTable) {
           Database db = msdb.getDatabase(newDbName);
           // Update table stats. For partitioned table, we update stats in alterPartition()
-          MetaStoreUtils.updateTableStatsFast(db, newt, wh, false, true, environmentContext);
+          MetaStoreUtils.updateTableStatsSlow(db, newt, wh, false, true, environmentContext);
         }
 
         if (cascade && isPartitionedTable) {

http://git-wip-us.apache.org/repos/asf/hive/blob/b9b0b0aa/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index aa233dd..02c345b 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -63,7 +63,6 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Multimaps;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.JvmPauseMonitor;
@@ -85,7 +84,6 @@ import org.apache.hadoop.hive.metastore.events.AddIndexEvent;
 import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
 import org.apache.hadoop.hive.metastore.events.AlterIndexEvent;
 import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
-import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
 import org.apache.hadoop.hive.metastore.events.ConfigChangeEvent;
 import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
 import org.apache.hadoop.hive.metastore.events.CreateFunctionEvent;
@@ -1455,7 +1453,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         }
         if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVESTATSAUTOGATHER) &&
             !MetaStoreUtils.isView(tbl)) {
-          MetaStoreUtils.updateTableStatsFast(db, tbl, wh, madeDir, envContext);
+          MetaStoreUtils.updateTableStatsSlow(db, tbl, wh, madeDir, false, envContext);
         }
 
         // set create time

http://git-wip-us.apache.org/repos/asf/hive/blob/b9b0b0aa/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
index 13bdf8e..3b2da10 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
@@ -188,72 +188,64 @@ public class MetaStoreUtils {
     return true;
   }
 
-  public static boolean updateTableStatsFast(Database db, Table tbl, Warehouse wh,
-      boolean madeDir, EnvironmentContext environmentContext) throws MetaException {
-    return updateTableStatsFast(db, tbl, wh, madeDir, false, environmentContext);
-  }
-
-  public static boolean updateTableStatsFast(Database db, Table tbl, Warehouse wh,
-      boolean madeDir, boolean forceRecompute, EnvironmentContext environmentContext) throws MetaException {
-    if (tbl.getPartitionKeysSize() == 0) {
-      // Update stats only when unpartitioned
-      FileStatus[] fileStatuses = wh.getFileStatusesForUnpartitionedTable(db, tbl);
-      return updateTableStatsFast(tbl, fileStatuses, madeDir, forceRecompute, environmentContext);
-    } else {
-      return false;
-    }
-  }
-
   /**
    * Updates the numFiles and totalSize parameters for the passed Table by querying
    * the warehouse if the passed Table does not already have values for these parameters.
+   * NOTE: This function is rather expensive since it needs to traverse the file system to get all
+   * the information.
+   *
    * @param tbl
-   * @param fileStatus
    * @param newDir if true, the directory was just created and can be assumed to be empty
    * @param forceRecompute Recompute stats even if the passed Table already has
    * these parameters set
-   * @return true if the stats were updated, false otherwise
    */
-  public static boolean updateTableStatsFast(Table tbl, FileStatus[] fileStatus, boolean newDir,
-      boolean forceRecompute, EnvironmentContext environmentContext) throws MetaException {
-
+  public static void updateTableStatsSlow(Database db, Table tbl, Warehouse wh,
+                                          boolean newDir, boolean forceRecompute,
+                                          EnvironmentContext environmentContext) throws MetaException {
+    // DO_NOT_UPDATE_STATS is supposed to be a transient parameter that is only passed via RPC
+    // We want to avoid this property from being persistent.
+    //
+    // NOTE: If this property *is* set as table property we will remove it which is incorrect but
+    // we can't distinguish between these two cases
+    //
+    // This problem was introduced by HIVE-10228. A better approach would be to pass the property
+    // via the environment context.
     Map<String,String> params = tbl.getParameters();
-
-    if ((params!=null) && params.containsKey(StatsSetupConst.DO_NOT_UPDATE_STATS)){
-      boolean doNotUpdateStats = Boolean.valueOf(params.get(StatsSetupConst.DO_NOT_UPDATE_STATS));
+    boolean updateStats = true;
+    if ((params != null) && params.containsKey(StatsSetupConst.DO_NOT_UPDATE_STATS)) {
+      updateStats = !Boolean.valueOf(params.get(StatsSetupConst.DO_NOT_UPDATE_STATS));
       params.remove(StatsSetupConst.DO_NOT_UPDATE_STATS);
-      tbl.setParameters(params); // to make sure we remove this marker property
-      if (doNotUpdateStats){
-        return false;
-      }
     }
 
-    boolean updated = false;
-    if (forceRecompute ||
-        params == null ||
-        !containsAllFastStats(params)) {
-      if (params == null) {
-        params = new HashMap<String,String>();
-      }
-      if (!newDir) {
-        // The table location already exists and may contain data.
-        // Let's try to populate those stats that don't require full scan.
-        LOG.info("Updating table stats fast for " + tbl.getTableName());
-        populateQuickStats(fileStatus, params);
-        LOG.info("Updated size of table " + tbl.getTableName() +" to "+ params.get(StatsSetupConst.TOTAL_SIZE));
-        if (environmentContext != null
-            && environmentContext.isSetProperties()
-            && StatsSetupConst.TASK.equals(environmentContext.getProperties().get(
-                StatsSetupConst.STATS_GENERATED))) {
-          StatsSetupConst.setBasicStatsState(params, StatsSetupConst.TRUE);
-        } else {
-          StatsSetupConst.setBasicStatsState(params, StatsSetupConst.FALSE);
-        }
-      }
+    if (!updateStats || newDir || tbl.getPartitionKeysSize() != 0) {
+      return;
+    }
+
+    // If stats are already present and forceRecompute isn't set, nothing to do
+    if (!forceRecompute && params != null && containsAllFastStats(params)) {
+      return;
+    }
+
+    // NOTE: wh.getFileStatusesForUnpartitionedTable() can be REALLY slow
+    FileStatus[] fileStatus = wh.getFileStatusesForUnpartitionedTable(db, tbl);
+    if (params == null) {
+      params = new HashMap<>();
       tbl.setParameters(params);
-      updated = true;
     }
-    return updated;
+    // The table location already exists and may contain data.
+    // Let's try to populate those stats that don't require full scan.
+    LOG.info("Updating table stats for {}", tbl.getTableName());
+    populateQuickStats(fileStatus, params);
+    LOG.info("Updated size of table {} to {}",
+        tbl.getTableName(), params.get(StatsSetupConst.TOTAL_SIZE));
+    if (environmentContext != null
+        && environmentContext.isSetProperties()
+        && StatsSetupConst.TASK.equals(environmentContext.getProperties().get(
+        StatsSetupConst.STATS_GENERATED))) {
+      StatsSetupConst.setBasicStatsState(params, StatsSetupConst.TRUE);
+    } else {
+      StatsSetupConst.setBasicStatsState(params, StatsSetupConst.FALSE);
+    }
   }
 
   public static void populateQuickStats(FileStatus[] fileStatus, Map<String, String> params) {

http://git-wip-us.apache.org/repos/asf/hive/blob/b9b0b0aa/metastore/src/test/org/apache/hadoop/hive/metastore/TestMetaStoreUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/TestMetaStoreUtils.java b/metastore/src/test/org/apache/hadoop/hive/metastore/TestMetaStoreUtils.java
index e5c8a40..0ffe782 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/TestMetaStoreUtils.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/TestMetaStoreUtils.java
@@ -18,13 +18,51 @@
 
 package org.apache.hadoop.hive.metastore;
 
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TException;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.hive.common.StatsSetupConst.COLUMN_STATS_ACCURATE;
+import static org.apache.hadoop.hive.common.StatsSetupConst.NUM_FILES;
+import static org.apache.hadoop.hive.common.StatsSetupConst.STATS_GENERATED;
+import static org.apache.hadoop.hive.common.StatsSetupConst.TOTAL_SIZE;
+import static org.apache.hadoop.hive.metastore.MetaStoreUtils.updateTableStatsSlow;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class TestMetaStoreUtils {
+  private static final String DB_NAME = "db1";
+  private static final String TABLE_NAME = "tbl1";
+  private static final String SERDE_LIB = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
+
+
+  private final Map<String, String> paramsWithStats = ImmutableMap.of(NUM_FILES, "1", TOTAL_SIZE, "2");
+
+  private Database db;
+
+  public TestMetaStoreUtils() {
+    db = new Database(DB_NAME, "", "/", null);
+  }
 
   @Test
   public void testcolumnsIncludedByNameType() {
@@ -40,4 +78,178 @@ public class TestMetaStoreUtils {
     Assert.assertTrue(MetaStoreUtils.columnsIncludedByNameType(Arrays.asList(col1, col2), Arrays.asList(col3, col2, col1)));
     Assert.assertFalse(MetaStoreUtils.columnsIncludedByNameType(Arrays.asList(col1, col2), Arrays.asList(col1)));
   }
+
+  /**
+   * Verify that updateTableStatsSlow really updates table statistics.
+   * The test does the following:
+   * <ol>
+   *   <li>Create database</li>
+   *   <li>Create unpartitioned table</li>
+   *   <li>Create unpartitioned table which has params</li>
+   *   <li>Call updateTableStatsSlow with arguments which should caue stats calculation</li>
+   *   <li>Verify table statistics using mocked warehouse</li>
+   *   <li>Create table which already have stats</li>
+   *   <li>Call updateTableStatsSlow forcing stats recompute</li>
+   *   <li>Verify table statistics using mocked warehouse</li>
+   *   <li>Verifies behavior when STATS_GENERATED is set in environment context</li>
+   * </ol>
+   */
+  @Test
+  public void testUpdateTableStatsSlow_statsUpdated() throws TException {
+    long fileLength = 5;
+
+    // Create database and table
+    Table tbl = new Table();
+    tbl.setDbName(DB_NAME);
+    tbl.setTableName(TABLE_NAME);
+    StorageDescriptor sd = new StorageDescriptor();
+    sd.setCols(Collections.singletonList(new FieldSchema("id", "int", "")));
+    sd.setLocation("/tmp");
+    tbl.setSd(sd);
+    tbl.setParameters(new HashMap<String, String>());
+
+    // Set up mock warehouse
+    FileStatus fs1 = new FileStatus(1, true, 2, 3,
+        4, new Path("/tmp/0"));
+    FileStatus fs2 = new FileStatus(fileLength, false, 3, 4,
+        5, new Path("/tmp/1"));
+    FileStatus fs3 = new FileStatus(fileLength, false, 3, 4,
+        5, new Path("/tmp/1"));
+    FileStatus[] fileStatus = {fs1, fs2, fs3};
+    Warehouse wh = mock(Warehouse.class);
+    when(wh.getFileStatusesForUnpartitionedTable(db, tbl)).thenReturn(fileStatus);
+
+    Map<String, String> expected = ImmutableMap.of(NUM_FILES, "2",
+        TOTAL_SIZE, String.valueOf(2 * fileLength));
+    updateTableStatsSlow(db, tbl, wh, false, false, null);
+    assertThat(tbl.getParameters(), is(expected));
+
+    // Verify that when stats are already present and forceRecompute is specified they are recomputed
+    Table tbl1 = new Table();
+    tbl1.setDbName(DB_NAME);
+    tbl1.setTableName(TABLE_NAME);
+    tbl1.setSd(sd);
+    tbl1.setParameters(new HashMap<String, String>());
+    tbl1.getParameters().put(NUM_FILES, "0");
+    tbl1.getParameters().put(TOTAL_SIZE, "0");
+
+    when(wh.getFileStatusesForUnpartitionedTable(db, tbl1)).thenReturn(fileStatus);
+    updateTableStatsSlow(db, tbl1, wh, false, true, null);
+    assertThat(tbl1.getParameters(), is(expected));
+
+    // Verify that COLUMN_STATS_ACCURATE is removed from params
+    Table tbl2 = new Table();
+    tbl2.setDbName(DB_NAME);
+    tbl2.setTableName(TABLE_NAME);
+    tbl2.setSd(sd);
+    tbl2.setParameters(new HashMap<String, String>());
+    tbl2.getParameters().put(COLUMN_STATS_ACCURATE, "true");
+
+    when(wh.getFileStatusesForUnpartitionedTable(db, tbl2)).thenReturn(fileStatus);
+    updateTableStatsSlow(db, tbl2, wh, false, true, null);
+    assertThat(tbl2.getParameters(), is(expected));
+
+    EnvironmentContext context = new EnvironmentContext(ImmutableMap.of(STATS_GENERATED,
+        StatsSetupConst.TASK));
+
+    // Verify that if environment context has STATS_GENERATED set to task,
+    // COLUMN_STATS_ACCURATE in params is set to correct value
+    Table tbl3 = new Table();
+    tbl3.setDbName(DB_NAME);
+    tbl3.setTableName(TABLE_NAME);
+    tbl3.setSd(sd);
+    tbl3.setParameters(new HashMap<String, String>());
+    tbl3.getParameters().put(COLUMN_STATS_ACCURATE, "foo");
+    when(wh.getFileStatusesForUnpartitionedTable(db, tbl3)).thenReturn(fileStatus);
+    updateTableStatsSlow(db, tbl3, wh, false, true, context);
+
+    Map<String, String> expected1 = ImmutableMap.of(NUM_FILES, "2",
+        TOTAL_SIZE, String.valueOf(2 * fileLength),
+        COLUMN_STATS_ACCURATE, "{\"BASIC_STATS\":\"true\"}");
+    assertThat(tbl3.getParameters(), is(expected1));
+  }
+
+  /**
+   * Verify that the call to updateTableStatsSlow() removes DO_NOT_UPDATE_STATS from table params.
+   */
+  @Test
+  public void testUpdateTableStatsSlow_removesDoNotUpdateStats() throws TException {
+    // Create database and table
+    Table tbl = new Table();
+    tbl.setDbName(DB_NAME);
+    tbl.setTableName(TABLE_NAME);
+    StorageDescriptor sd = new StorageDescriptor();
+    sd.setCols(Collections.singletonList(new FieldSchema("id", "int", "")));
+    sd.setLocation("/tmp");
+    tbl.setSd(sd);
+    tbl.setParameters(new HashMap<String, String>());
+    tbl.getParameters().put(StatsSetupConst.DO_NOT_UPDATE_STATS, "true");
+
+    Table tbl1 = new Table();
+    tbl1.setDbName(DB_NAME);
+    tbl1.setTableName(TABLE_NAME);
+    tbl1.setSd(sd);
+    tbl1.setParameters(new HashMap<String, String>());
+    tbl1.getParameters().put(StatsSetupConst.DO_NOT_UPDATE_STATS, "false");
+
+    Warehouse wh = mock(Warehouse.class);
+    updateTableStatsSlow(db, tbl, wh, false, true, null);
+    Map<String, String> expected = Collections.emptyMap();
+    assertThat(tbl.getParameters(), is(expected));
+    verify(wh, never()).getFileStatusesForUnpartitionedTable(db, tbl);
+    updateTableStatsSlow(db, tbl1, wh, true, false, null);
+    assertThat(tbl.getParameters(), is(expected));
+    verify(wh, never()).getFileStatusesForUnpartitionedTable(db, tbl1);
+  }
+
+  /**
+   * Verify that updateTableStatsSlow() does not calculate tabe statistics when
+   * <ol>
+   *   <li>newDir is true</li>
+   *   <li>Table is partitioned</li>
+   *   <li>Stats are already present and forceRecompute isn't set</li>
+   * </ol>
+   */
+  @Test
+  public void testUpdateTableStatsSlow_doesNotUpdateStats() throws TException {
+    FieldSchema fs = new FieldSchema("date", "string", "date column");
+    List<FieldSchema> cols = Collections.singletonList(fs);
+
+    // Create database and table
+    Table tbl = new Table();
+    tbl.setDbName(DB_NAME);
+    tbl.setTableName(TABLE_NAME);
+    StorageDescriptor sd = new StorageDescriptor();
+    sd.setCols(Collections.singletonList(new FieldSchema("id", "int", "")));
+    sd.setLocation("/tmp");
+    tbl.setSd(sd);
+    tbl.setParameters(new HashMap<String, String>());
+
+    Warehouse wh = mock(Warehouse.class);
+    // newDir(true) => stats not updated
+    updateTableStatsSlow(db, tbl, wh, true, false, null);
+    verify(wh, never()).getFileStatusesForUnpartitionedTable(db, tbl);
+
+    // partitioned table => stats not updated
+    Table tbl1 = new Table();
+    tbl1.setDbName(DB_NAME);
+    tbl1.setTableName(TABLE_NAME);
+    tbl1.setPartitionKeys(cols);
+    tbl1.setSd(sd);
+    tbl.setParameters(new HashMap<String, String>());
+
+    updateTableStatsSlow(db, tbl1, wh, false, false, null);
+    verify(wh, never()).getFileStatusesForUnpartitionedTable(db, tbl1);
+
+    // Already contains stats => stats not updated when forceRecompute isn't set
+    Table tbl2 = new Table();
+    tbl2.setDbName(DB_NAME);
+    tbl2.setTableName(TABLE_NAME);
+    tbl2.setSd(sd);
+    tbl2.setParameters(paramsWithStats);
+
+    updateTableStatsSlow(db, tbl2, wh, false, false, null);
+    verify(wh, never()).getFileStatusesForUnpartitionedTable(db, tbl2);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/b9b0b0aa/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
index 109bc3a..fd867f4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
@@ -447,8 +447,7 @@ public class SessionHiveMetaStoreClient extends HiveMetaStoreClient implements I
     }
 
     org.apache.hadoop.hive.metastore.api.Table newtCopy = deepCopyAndLowerCaseTable(newt);
-    MetaStoreUtils.updateTableStatsFast(newtCopy,
-        getWh().getFileStatusesForSD(newtCopy.getSd()), false, true, envContext);
+    MetaStoreUtils.updateTableStatsSlow(null, newtCopy, getWh(), false, true, envContext);
     Table newTable = new Table(newtCopy);
     String newDbName = newTable.getDbName();
     String newTableName = newTable.getTableName();