You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2017/01/23 18:14:13 UTC

hive git commit: HIVE-15439 : Support INSERT OVERWRITE for internal druid datasources. (Slim Bouguerra via Jesus Camacho Rodriguez)

Repository: hive
Updated Branches:
  refs/heads/master 7c57c05cd -> 38c3f1a58


HIVE-15439 : Support INSERT OVERWRITE for internal druid datasources. (Slim Bouguerra via Jesus Camacho Rodriguez)

Signed-off-by: Ashutosh Chauhan <ha...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/38c3f1a5
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/38c3f1a5
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/38c3f1a5

Branch: refs/heads/master
Commit: 38c3f1a583b974e8a7226311fc07fc50285ef528
Parents: 7c57c05
Author: Slim Bouguerra <sl...@gmail.com>
Authored: Wed Nov 1 12:14:00 2017 -0800
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Mon Jan 23 10:13:15 2017 -0800

----------------------------------------------------------------------
 .../results/positive/accumulo_queries.q.out     | 20 +++++--
 .../accumulo_single_sourced_multi_insert.q.out  | 35 ++++++------
 druid-handler/pom.xml                           |  5 ++
 .../hadoop/hive/druid/DruidStorageHandler.java  | 23 +++++++-
 .../hive/druid/DruidStorageHandlerTest.java     | 56 +++++++++++++++++---
 .../hadoop/hive/druid/TestDerbyConnector.java   | 18 +++++++
 .../hive/ql/io/DruidRecordWriterTest.java       | 18 +++++++
 .../test/results/positive/hbase_queries.q.out   | 20 +++++--
 .../hbase_single_sourced_multi_insert.q.out     | 35 ++++++------
 .../src/test/results/positive/hbasestats.q.out  |  7 ++-
 .../hadoop/hive/metastore/HiveMetaHookV2.java   | 51 ++++++++++++++++++
 .../hive/metastore/HiveMetaStoreClient.java     | 18 +++++++
 .../hadoop/hive/metastore/IMetaStoreClient.java | 11 +++-
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 14 +++++
 ...tedDynPartitionTimeGranularityOptimizer.java | 13 ++++-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  | 22 +++++---
 .../org/apache/hadoop/hive/ql/plan/DDLWork.java | 17 ++++++
 .../hadoop/hive/ql/plan/InsertTableDesc.java    | 40 ++++++++++++++
 18 files changed, 364 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/38c3f1a5/accumulo-handler/src/test/results/positive/accumulo_queries.q.out
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/test/results/positive/accumulo_queries.q.out b/accumulo-handler/src/test/results/positive/accumulo_queries.q.out
index a6d2632..d7cceec 100644
--- a/accumulo-handler/src/test/results/positive/accumulo_queries.q.out
+++ b/accumulo-handler/src/test/results/positive/accumulo_queries.q.out
@@ -40,7 +40,8 @@ POSTHOOK: query: EXPLAIN FROM src INSERT OVERWRITE TABLE accumulo_table_1 SELECT
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
   Stage-0 is a root stage
-  Stage-1 is a root stage
+  Stage-1
+  Stage-2 is a root stage
 
 STAGE PLANS:
   Stage: Stage-0
@@ -52,6 +53,10 @@ STAGE PLANS:
             COLUMN_STATS_ACCURATE 
 
   Stage: Stage-1
+      Insert operator:
+        Insert
+
+  Stage: Stage-2
     Map Reduce
       Map Operator Tree:
           TableScan
@@ -490,8 +495,9 @@ ON (x.key = Y.key)
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
   Stage-0 is a root stage
-  Stage-2 is a root stage
-  Stage-1 depends on stages: Stage-2
+  Stage-1
+  Stage-3 is a root stage
+  Stage-2 depends on stages: Stage-3
 
 STAGE PLANS:
   Stage: Stage-0
@@ -502,7 +508,11 @@ STAGE PLANS:
           properties:
             COLUMN_STATS_ACCURATE 
 
-  Stage: Stage-2
+  Stage: Stage-1
+      Insert operator:
+        Insert
+
+  Stage: Stage-3
     Map Reduce
       Map Operator Tree:
           TableScan
@@ -537,7 +547,7 @@ STAGE PLANS:
                 output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                 serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
 
-  Stage: Stage-1
+  Stage: Stage-2
     Map Reduce
       Map Operator Tree:
           TableScan

http://git-wip-us.apache.org/repos/asf/hive/blob/38c3f1a5/accumulo-handler/src/test/results/positive/accumulo_single_sourced_multi_insert.q.out
----------------------------------------------------------------------
diff --git a/accumulo-handler/src/test/results/positive/accumulo_single_sourced_multi_insert.q.out b/accumulo-handler/src/test/results/positive/accumulo_single_sourced_multi_insert.q.out
index 55e7176..7330746 100644
--- a/accumulo-handler/src/test/results/positive/accumulo_single_sourced_multi_insert.q.out
+++ b/accumulo-handler/src/test/results/positive/accumulo_single_sourced_multi_insert.q.out
@@ -34,14 +34,15 @@ select value,"" where a.key > 50 AND a.key < 100
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
   Stage-0 is a root stage
-  Stage-2 is a root stage
-  Stage-8 depends on stages: Stage-2 , consists of Stage-5, Stage-4, Stage-6
-  Stage-5
-  Stage-1 depends on stages: Stage-5, Stage-4, Stage-7
-  Stage-3 depends on stages: Stage-1
-  Stage-4
+  Stage-1
+  Stage-3 is a root stage
+  Stage-9 depends on stages: Stage-3 , consists of Stage-6, Stage-5, Stage-7
   Stage-6
-  Stage-7 depends on stages: Stage-6
+  Stage-2 depends on stages: Stage-6, Stage-5, Stage-8
+  Stage-4 depends on stages: Stage-2
+  Stage-5
+  Stage-7
+  Stage-8 depends on stages: Stage-7
 
 STAGE PLANS:
   Stage: Stage-0
@@ -52,7 +53,11 @@ STAGE PLANS:
           properties:
             COLUMN_STATS_ACCURATE 
 
-  Stage: Stage-2
+  Stage: Stage-1
+      Insert operator:
+        Insert
+
+  Stage: Stage-3
     Map Reduce
       Map Operator Tree:
           TableScan
@@ -89,16 +94,16 @@ STAGE PLANS:
                       serde: org.apache.hadoop.hive.accumulo.serde.AccumuloSerDe
                       name: default.src_x2
 
-  Stage: Stage-8
+  Stage: Stage-9
     Conditional Operator
 
-  Stage: Stage-5
+  Stage: Stage-6
     Move Operator
       files:
           hdfs directory: true
 #### A masked pattern was here ####
 
-  Stage: Stage-1
+  Stage: Stage-2
     Move Operator
       tables:
           replace: true
@@ -108,10 +113,10 @@ STAGE PLANS:
               serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
               name: default.src_x1
 
-  Stage: Stage-3
+  Stage: Stage-4
     Stats-Aggr Operator
 
-  Stage: Stage-4
+  Stage: Stage-5
     Map Reduce
       Map Operator Tree:
           TableScan
@@ -123,7 +128,7 @@ STAGE PLANS:
                   serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                   name: default.src_x1
 
-  Stage: Stage-6
+  Stage: Stage-7
     Map Reduce
       Map Operator Tree:
           TableScan
@@ -135,7 +140,7 @@ STAGE PLANS:
                   serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                   name: default.src_x1
 
-  Stage: Stage-7
+  Stage: Stage-8
     Move Operator
       files:
           hdfs directory: true

http://git-wip-us.apache.org/repos/asf/hive/blob/38c3f1a5/druid-handler/pom.xml
----------------------------------------------------------------------
diff --git a/druid-handler/pom.xml b/druid-handler/pom.xml
index d0104dc..a7a0f8f 100644
--- a/druid-handler/pom.xml
+++ b/druid-handler/pom.xml
@@ -86,6 +86,11 @@
       </exclusions>
     </dependency>
     <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+      <version>${joda.version}</version>
+    </dependency>
+    <dependency>
       <groupId>io.druid</groupId>
       <artifactId>druid-server</artifactId>
       <version>${druid.version}</version>

http://git-wip-us.apache.org/repos/asf/hive/blob/38c3f1a5/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
index 2fa942f..904ac80 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.druid.io.DruidOutputFormat;
 import org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat;
 import org.apache.hadoop.hive.druid.serde.DruidSerDe;
+import org.apache.hadoop.hive.metastore.HiveMetaHookV2;
 import org.apache.hadoop.hive.metastore.HiveMetaHook;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -78,7 +79,7 @@ import java.util.concurrent.Callable;
  * DruidStorageHandler provides a HiveStorageHandler implementation for Druid.
  */
 @SuppressWarnings({ "deprecation", "rawtypes" })
-public class DruidStorageHandler extends DefaultStorageHandler implements HiveMetaHook {
+public class DruidStorageHandler extends DefaultStorageHandler implements HiveMetaHookV2 {
 
   protected static final Logger LOG = LoggerFactory.getLogger(DruidStorageHandler.class);
 
@@ -464,6 +465,26 @@ public class DruidStorageHandler extends DefaultStorageHandler implements HiveMe
   }
 
   @Override
+  public void commitInsertTable(Table table, boolean overwrite) throws MetaException {
+    if (overwrite) {
+      LOG.debug(String.format("commit insert overwrite into table [%s]", table.getTableName()));
+      this.commitCreateTable(table);
+    } else {
+      throw new MetaException("Insert into is not supported yet");
+    }
+  }
+
+  @Override
+  public void preInsertTable(Table table, boolean overwrite) throws MetaException {
+    //do nothing
+  }
+
+  @Override
+  public void rollbackInsertTable(Table table, boolean overwrite) throws MetaException {
+    // do nothing
+  }
+
+  @Override
   public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties
   ) {
     jobProperties.put(Constants.DRUID_SEGMENT_VERSION, new DateTime().toString());

http://git-wip-us.apache.org/repos/asf/hive/blob/38c3f1a5/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java
index e7eb4cc..3573bf9 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.hive.druid;
 
 import com.google.common.collect.ImmutableMap;
@@ -43,14 +61,14 @@ public class DruidStorageHandlerTest {
 
   private String segmentsTable;
 
-  private String tablePath;
+  private String tableWorkingPath;
 
   private DataSegment dataSegment = DataSegment.builder().dataSource(DATA_SOURCE_NAME).version("v1")
           .interval(new Interval(100, 170)).shardSpec(NoneShardSpec.instance()).build();
 
   @Before
   public void before() throws Throwable {
-    tablePath = temporaryFolder.newFolder().getAbsolutePath();
+    tableWorkingPath = temporaryFolder.newFolder().getAbsolutePath();
     segmentsTable = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();
     Map<String, String> mockMap = ImmutableMap.of(Constants.DRUID_DATA_SOURCE, DATA_SOURCE_NAME);
     Mockito.when(tableMock.getParameters()).thenReturn(mockMap);
@@ -115,14 +133,10 @@ public class DruidStorageHandlerTest {
     druidStorageHandler.preCreateTable(tableMock);
     Configuration config = new Configuration();
     config.set(String.valueOf(HiveConf.ConfVars.HIVEQUERYID), UUID.randomUUID().toString());
-    config.set(String.valueOf(HiveConf.ConfVars.DRUID_WORKING_DIR), tablePath);
+    config.set(String.valueOf(HiveConf.ConfVars.DRUID_WORKING_DIR), tableWorkingPath);
     druidStorageHandler.setConf(config);
     LocalFileSystem localFileSystem = FileSystem.getLocal(config);
-    /*
-    final descriptor path is in the form tablePath/taskId_Attempt_ID/segmentDescriptorDir/segmentIdentifier.json
-    UUID.randomUUID() will fake the taskId_attemptID
-    */
-    Path taskDirPath = new Path(tablePath, druidStorageHandler.makeStagingName());
+    Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName());
     Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment,
             new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME)
     );
@@ -141,6 +155,32 @@ public class DruidStorageHandlerTest {
   }
 
   @Test
+  public void testCommitInsertTable() throws MetaException, IOException {
+    DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
+            derbyConnectorRule.getConnector(),
+            new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
+            derbyConnectorRule.metadataTablesConfigSupplier().get(),
+            null
+    );
+    druidStorageHandler.preCreateTable(tableMock);
+    Configuration config = new Configuration();
+    config.set(String.valueOf(HiveConf.ConfVars.HIVEQUERYID), UUID.randomUUID().toString());
+    config.set(String.valueOf(HiveConf.ConfVars.DRUID_WORKING_DIR), tableWorkingPath);
+    druidStorageHandler.setConf(config);
+    LocalFileSystem localFileSystem = FileSystem.getLocal(config);
+    Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName());
+    Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment,
+            new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME)
+    );
+    DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, dataSegment, descriptorPath);
+    druidStorageHandler.commitCreateTable(tableMock);
+    Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), Lists.newArrayList(
+            DruidStorageHandlerUtils.getAllDataSourceNames(derbyConnectorRule.getConnector(),
+                    derbyConnectorRule.metadataTablesConfigSupplier().get()
+            )).toArray());
+  }
+
+  @Test
   public void testDeleteSegment() throws IOException, SegmentLoadingException {
     DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
             derbyConnectorRule.getConnector(),

http://git-wip-us.apache.org/repos/asf/hive/blob/38c3f1a5/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java
index 75c0129..1014ab6 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.hive.druid;
 
 import com.google.common.base.Supplier;

http://git-wip-us.apache.org/repos/asf/hive/blob/38c3f1a5/druid-handler/src/test/org/apache/hadoop/hive/ql/io/DruidRecordWriterTest.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/DruidRecordWriterTest.java b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/DruidRecordWriterTest.java
index a4272ee..b1310b6 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/DruidRecordWriterTest.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/DruidRecordWriterTest.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.hive.ql.io;
 
 import com.fasterxml.jackson.databind.ObjectMapper;

http://git-wip-us.apache.org/repos/asf/hive/blob/38c3f1a5/hbase-handler/src/test/results/positive/hbase_queries.q.out
----------------------------------------------------------------------
diff --git a/hbase-handler/src/test/results/positive/hbase_queries.q.out b/hbase-handler/src/test/results/positive/hbase_queries.q.out
index d5c1cfa..1eeaf80 100644
--- a/hbase-handler/src/test/results/positive/hbase_queries.q.out
+++ b/hbase-handler/src/test/results/positive/hbase_queries.q.out
@@ -40,7 +40,8 @@ POSTHOOK: query: EXPLAIN FROM src INSERT OVERWRITE TABLE hbase_table_1 SELECT *
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
   Stage-0 is a root stage
-  Stage-1 is a root stage
+  Stage-1
+  Stage-2 is a root stage
 
 STAGE PLANS:
   Stage: Stage-0
@@ -52,6 +53,10 @@ STAGE PLANS:
             COLUMN_STATS_ACCURATE 
 
   Stage: Stage-1
+      Insert operator:
+        Insert
+
+  Stage: Stage-2
     Map Reduce
       Map Operator Tree:
           TableScan
@@ -493,8 +498,9 @@ ON (x.key = Y.key)
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
   Stage-0 is a root stage
-  Stage-2 is a root stage
-  Stage-1 depends on stages: Stage-2
+  Stage-1
+  Stage-3 is a root stage
+  Stage-2 depends on stages: Stage-3
 
 STAGE PLANS:
   Stage: Stage-0
@@ -505,7 +511,11 @@ STAGE PLANS:
           properties:
             COLUMN_STATS_ACCURATE 
 
-  Stage: Stage-2
+  Stage: Stage-1
+      Insert operator:
+        Insert
+
+  Stage: Stage-3
     Map Reduce
       Map Operator Tree:
           TableScan
@@ -540,7 +550,7 @@ STAGE PLANS:
                 output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                 serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
 
-  Stage: Stage-1
+  Stage: Stage-2
     Map Reduce
       Map Operator Tree:
           TableScan

http://git-wip-us.apache.org/repos/asf/hive/blob/38c3f1a5/hbase-handler/src/test/results/positive/hbase_single_sourced_multi_insert.q.out
----------------------------------------------------------------------
diff --git a/hbase-handler/src/test/results/positive/hbase_single_sourced_multi_insert.q.out b/hbase-handler/src/test/results/positive/hbase_single_sourced_multi_insert.q.out
index a552350..079fb0e 100644
--- a/hbase-handler/src/test/results/positive/hbase_single_sourced_multi_insert.q.out
+++ b/hbase-handler/src/test/results/positive/hbase_single_sourced_multi_insert.q.out
@@ -34,14 +34,15 @@ select value,"" where a.key > 50 AND a.key < 100
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
   Stage-0 is a root stage
-  Stage-2 is a root stage
-  Stage-8 depends on stages: Stage-2 , consists of Stage-5, Stage-4, Stage-6
-  Stage-5
-  Stage-1 depends on stages: Stage-5, Stage-4, Stage-7
-  Stage-3 depends on stages: Stage-1
-  Stage-4
+  Stage-1
+  Stage-3 is a root stage
+  Stage-9 depends on stages: Stage-3 , consists of Stage-6, Stage-5, Stage-7
   Stage-6
-  Stage-7 depends on stages: Stage-6
+  Stage-2 depends on stages: Stage-6, Stage-5, Stage-8
+  Stage-4 depends on stages: Stage-2
+  Stage-5
+  Stage-7
+  Stage-8 depends on stages: Stage-7
 
 STAGE PLANS:
   Stage: Stage-0
@@ -52,7 +53,11 @@ STAGE PLANS:
           properties:
             COLUMN_STATS_ACCURATE 
 
-  Stage: Stage-2
+  Stage: Stage-1
+      Insert operator:
+        Insert
+
+  Stage: Stage-3
     Map Reduce
       Map Operator Tree:
           TableScan
@@ -89,16 +94,16 @@ STAGE PLANS:
                       serde: org.apache.hadoop.hive.hbase.HBaseSerDe
                       name: default.src_x2
 
-  Stage: Stage-8
+  Stage: Stage-9
     Conditional Operator
 
-  Stage: Stage-5
+  Stage: Stage-6
     Move Operator
       files:
           hdfs directory: true
 #### A masked pattern was here ####
 
-  Stage: Stage-1
+  Stage: Stage-2
     Move Operator
       tables:
           replace: true
@@ -108,10 +113,10 @@ STAGE PLANS:
               serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
               name: default.src_x1
 
-  Stage: Stage-3
+  Stage: Stage-4
     Stats-Aggr Operator
 
-  Stage: Stage-4
+  Stage: Stage-5
     Map Reduce
       Map Operator Tree:
           TableScan
@@ -123,7 +128,7 @@ STAGE PLANS:
                   serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                   name: default.src_x1
 
-  Stage: Stage-6
+  Stage: Stage-7
     Map Reduce
       Map Operator Tree:
           TableScan
@@ -135,7 +140,7 @@ STAGE PLANS:
                   serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                   name: default.src_x1
 
-  Stage: Stage-7
+  Stage: Stage-8
     Move Operator
       files:
           hdfs directory: true

http://git-wip-us.apache.org/repos/asf/hive/blob/38c3f1a5/hbase-handler/src/test/results/positive/hbasestats.q.out
----------------------------------------------------------------------
diff --git a/hbase-handler/src/test/results/positive/hbasestats.q.out b/hbase-handler/src/test/results/positive/hbasestats.q.out
index cf4138e..4e47bf5 100644
--- a/hbase-handler/src/test/results/positive/hbasestats.q.out
+++ b/hbase-handler/src/test/results/positive/hbasestats.q.out
@@ -63,7 +63,8 @@ POSTHOOK: query: explain INSERT OVERWRITE TABLE users SELECT 'user1', 'IA', 'USA
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
   Stage-0 is a root stage
-  Stage-1 is a root stage
+  Stage-1
+  Stage-2 is a root stage
 
 STAGE PLANS:
   Stage: Stage-0
@@ -75,6 +76,10 @@ STAGE PLANS:
             COLUMN_STATS_ACCURATE 
 
   Stage: Stage-1
+      Insert operator:
+        Insert
+
+  Stage: Stage-2
     Map Reduce
       Map Operator Tree:
           TableScan

http://git-wip-us.apache.org/repos/asf/hive/blob/38c3f1a5/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaHookV2.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaHookV2.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaHookV2.java
new file mode 100644
index 0000000..e691c1f
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaHookV2.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
+
+public interface HiveMetaHookV2 extends HiveMetaHook {
+  /**
+   * Called after successfully after INSERT [OVERWRITE] statement is executed.
+   * @param table table definition
+   * @param overwrite true if it is INSERT OVERWRITE
+   *
+   * @throws MetaException
+   */
+  public void commitInsertTable(Table table, boolean overwrite) throws MetaException;
+
+  /**
+   * called before commit insert method is called
+   * @param table table definition
+   * @param overwrite true if it is INSERT OVERWRITE
+   *
+   * @throws MetaException
+   */
+  public void preInsertTable(Table table, boolean overwrite) throws MetaException;
+
+  /**
+   * called in case pre commit or commit insert fail.
+   * @param table table definition
+   * @param overwrite true if it is INSERT OVERWRITE
+   *
+   * @throws MetaException
+   */
+  public void rollbackInsertTable(Table table, boolean overwrite) throws MetaException;
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/38c3f1a5/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 9eec56a..83b481c 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -2178,6 +2178,24 @@ public class HiveMetaStoreClient implements IMetaStoreClient {
     client.add_dynamic_partitions(adp);
   }
 
+  @Override
+  public void insertTable(Table table, boolean overwrite) throws MetaException {
+    boolean failed = true;
+    HiveMetaHook hook = getHook(table);
+    if (hook == null || !(hook instanceof HiveMetaHookV2)) {
+      return;
+    }
+    HiveMetaHookV2 hiveMetaHook = (HiveMetaHookV2) hook;
+    try {
+      hiveMetaHook.preInsertTable(table, overwrite);
+      hiveMetaHook.commitInsertTable(table, overwrite);
+    } finally {
+      if (failed) {
+        hiveMetaHook.rollbackInsertTable(table, overwrite);
+      }
+    }
+  }
+
   @InterfaceAudience.LimitedPrivate({"HCatalog"})
   @Override
   public NotificationEventResponse getNextNotification(long lastEventId, int maxEvents,

http://git-wip-us.apache.org/repos/asf/hive/blob/38c3f1a5/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
index 8ba7352..fb61db1 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
@@ -1507,6 +1507,15 @@ public interface IMetaStoreClient {
     throws TException;
 
   /**
+   * Performs the pre/commit/rollback to the metadata storage for insert operator from external storage handler.
+   * @param table table name
+   * @param overwrite true if the insert is overwrite
+   *
+   * @throws MetaException
+   */
+  void insertTable(Table table, boolean overwrite) throws MetaException;
+
+  /**
    * A filter provided by the client that determines if a given notification event should be
    * returned.
    */
@@ -1631,7 +1640,7 @@ public interface IMetaStoreClient {
     List<SQLPrimaryKey> primaryKeys, List<SQLForeignKey> foreignKeys)
     throws AlreadyExistsException, InvalidObjectException, MetaException, NoSuchObjectException, TException;
 
-  void dropConstraint(String dbName, String tableName, String constraintName) throws 
+  void dropConstraint(String dbName, String tableName, String constraintName) throws
     MetaException, NoSuchObjectException, TException;
 
   void addPrimaryKey(List<SQLPrimaryKey> primaryKeyCols) throws

http://git-wip-us.apache.org/repos/asf/hive/blob/38c3f1a5/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index fc156c7..a930408 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -165,6 +165,7 @@ import org.apache.hadoop.hive.ql.plan.DropTableDesc;
 import org.apache.hadoop.hive.ql.plan.FileMergeDesc;
 import org.apache.hadoop.hive.ql.plan.GrantDesc;
 import org.apache.hadoop.hive.ql.plan.GrantRevokeRoleDDL;
+import org.apache.hadoop.hive.ql.plan.InsertTableDesc;
 import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
 import org.apache.hadoop.hive.ql.plan.LockDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.LockTableDesc;
@@ -562,6 +563,10 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
       if (cacheMetadataDesc != null) {
         return cacheMetadata(db, cacheMetadataDesc);
       }
+      InsertTableDesc insertTableDesc = work.getInsertTableDesc();
+      if (insertTableDesc != null) {
+        return insertCommitWork(db, insertTableDesc);
+      }
     } catch (Throwable e) {
       failed(e);
       return 1;
@@ -570,6 +575,15 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
     return 0;
   }
 
+  private int insertCommitWork(Hive db, InsertTableDesc insertTableDesc) throws HiveException {
+    try {
+      db.getMSC().insertTable(insertTableDesc.getTable(), insertTableDesc.isOverwrite());
+      return 0;
+    } catch (MetaException e) {
+      throw new HiveException(e);
+    }
+  }
+
   private int cacheMetadata(Hive db, CacheMetadataDesc desc) throws HiveException {
     db.cacheFileMetadata(desc.getDbName(), desc.getTableName(),
         desc.getPartName(), desc.isAllParts());

http://git-wip-us.apache.org/repos/asf/hive/blob/38c3f1a5/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionTimeGranularityOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionTimeGranularityOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionTimeGranularityOptimizer.java
index 7ea4754..7a4f22a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionTimeGranularityOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionTimeGranularityOptimizer.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.lib.Rule;
 import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
@@ -128,8 +129,16 @@ public class SortedDynPartitionTimeGranularityOptimizer extends Transform {
         // Bail out, nothing to do
         return null;
       }
-      String segmentGranularity = parseCtx.getCreateTable().getTblProps()
-              .get(Constants.DRUID_SEGMENT_GRANULARITY);
+      String segmentGranularity = null;
+      final Table table = fsOp.getConf().getTable();
+      if (table != null) {
+        // case the statement is an INSERT
+        segmentGranularity = table.getParameters().get(Constants.DRUID_SEGMENT_GRANULARITY);
+      } else {
+        // case the statement is a CREATE TABLE AS
+       segmentGranularity = parseCtx.getCreateTable().getTblProps()
+                .get(Constants.DRUID_SEGMENT_GRANULARITY);
+      }
       segmentGranularity = !Strings.isNullOrEmpty(segmentGranularity)
               ? segmentGranularity
               : HiveConf.getVar(parseCtx.getConf(),

http://git-wip-us.apache.org/repos/asf/hive/blob/38c3f1a5/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 0b4b6e1..266bc7d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -177,6 +177,7 @@ import org.apache.hadoop.hive.ql.plan.FilterDesc.SampleDesc;
 import org.apache.hadoop.hive.ql.plan.ForwardDesc;
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
 import org.apache.hadoop.hive.ql.plan.HiveOperation;
+import org.apache.hadoop.hive.ql.plan.InsertTableDesc;
 import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
 import org.apache.hadoop.hive.ql.plan.JoinDesc;
 import org.apache.hadoop.hive.ql.plan.LateralViewForwardDesc;
@@ -1842,7 +1843,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
   public void getMaterializationMetadata(QB qb) throws SemanticException {
     try {
       gatherCTEReferences(qb, rootClause);
-      int threshold = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_CTE_MATERIALIZE_THRESHOLD);      
+      int threshold = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_CTE_MATERIALIZE_THRESHOLD);
       for (CTEClause cte : Sets.newHashSet(aliasToCTEs.values())) {
         if (threshold >= 0 && cte.reference >= threshold) {
           cte.materialize = true;
@@ -2556,7 +2557,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     case HiveParser.TOK_CHARSETLITERAL:
     case HiveParser.KW_TRUE:
     case HiveParser.KW_FALSE:
-      break;  
+      break;
 
     case HiveParser.TOK_FUNCTION:
       // check all the arguments
@@ -6765,6 +6766,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         // This is a non-native table.
         // We need to set stats as inaccurate.
         setStatsForNonNativeTable(dest_tab);
+        createInsertDesc(dest_tab, !qb.getParseInfo().isInsertIntoTable(dest_tab.getTableName()));
       }
 
       WriteEntity output = null;
@@ -7029,7 +7031,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     }
 
     input = genConversionSelectOperator(dest, qb, input, table_desc, dpCtx);
- 
+
     inputRR = opParseCtx.get(input).getRowResolver();
 
     ArrayList<ColumnInfo> vecCol = new ArrayList<ColumnInfo>();
@@ -7183,6 +7185,14 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     return output;
   }
 
+  private void createInsertDesc(Table table, boolean overwrite) {
+    Task<? extends Serializable>[] tasks = new Task[this.rootTasks.size()];
+    tasks = this.rootTasks.toArray(tasks);
+    InsertTableDesc insertTableDesc = new InsertTableDesc(table.getTTable(), overwrite);
+    TaskFactory
+            .getAndMakeChild(new DDLWork(getInputs(), getOutputs(), insertTableDesc), conf, tasks);
+  }
+
   private void genAutoColumnStatsGatheringPipeline(QB qb, TableDesc table_desc,
       Map<String, String> partSpec, Operator curr, boolean isInsertInto) throws SemanticException {
     String tableName = table_desc.getTableName();
@@ -10760,7 +10770,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
           colNames.add(col.getName());
           colTypes.add(col.getType());
         }
-        
+
         basicInfos.put(new HivePrivilegeObject(table.getDbName(), table.getTableName(), colNames),
             new MaskAndFilterInfo(colTypes, additionalTabInfo.toString(), alias, astNode, table.isView()));
       }
@@ -10785,7 +10795,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       }
     }
   }
-  
+
   // We walk through the AST.
   // We replace all the TOK_TABREF by adding additional masking and filter if
   // the table needs to be masked or filtered.
@@ -10907,7 +10917,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     // masking and filtering should be created here
     // the basic idea is similar to unparseTranslator.
     tableMask = new TableMask(this, conf, ctx);
-    
+
     // 4. continue analyzing from the child ASTNode.
     Phase1Ctx ctx_1 = initPhase1Ctx();
     preProcessForInsert(child, qb);

http://git-wip-us.apache.org/repos/asf/hive/blob/38c3f1a5/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java
index e069acd..c4efb3f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java
@@ -31,6 +31,8 @@ import org.apache.hadoop.hive.ql.plan.Explain.Level;
  */
 public class DDLWork implements Serializable {
   private static final long serialVersionUID = 1L;
+
+  private InsertTableDesc insertTableDesc;
   private CreateIndexDesc createIndexDesc;
   private AlterIndexDesc alterIndexDesc;
   private DropIndexDesc dropIdxDesc;
@@ -524,6 +526,12 @@ public class DDLWork implements Serializable {
     this.cacheMetadataDesc = cacheMetadataDesc;
   }
 
+  public DDLWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs,
+          InsertTableDesc insertTableDesc) {
+    this(inputs, outputs);
+    this.insertTableDesc = insertTableDesc;
+  }
+
   /**
    * @return Create Database descriptor
    */
@@ -1185,4 +1193,13 @@ public class DDLWork implements Serializable {
   public void setShowConfDesc(ShowConfDesc showConfDesc) {
     this.showConfDesc = showConfDesc;
   }
+
+  @Explain(displayName = "Insert operator", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+  public InsertTableDesc getInsertTableDesc() {
+    return insertTableDesc;
+  }
+
+  public void setInsertTableDesc(InsertTableDesc insertTableDesc) {
+    this.insertTableDesc = insertTableDesc;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/38c3f1a5/ql/src/java/org/apache/hadoop/hive/ql/plan/InsertTableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/InsertTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/InsertTableDesc.java
new file mode 100644
index 0000000..1397b8a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/InsertTableDesc.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import org.apache.hadoop.hive.metastore.api.Table;
+
+@Explain(displayName = "Insert", explainLevels = { Explain.Level.USER, Explain.Level.DEFAULT, Explain.Level.EXTENDED })
+public class InsertTableDesc extends DDLDesc {
+  private final Table table;
+  private final boolean overwrite;
+
+  public InsertTableDesc(Table table, boolean overwrite) {
+    this.table = table;
+    this.overwrite = overwrite;
+  }
+
+  public Table getTable() {
+    return table;
+  }
+
+  public boolean isOverwrite() {
+    return overwrite;
+  }
+}