You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2017/05/02 08:25:54 UTC

hive git commit: HIVE-16518: Insert override for druid does not replace all existing segments (Nishant Bangarwa, reviewed by Jesus Camacho Rodriguez)

Repository: hive
Updated Branches:
  refs/heads/master 0a235d8e1 -> 53b70bdc3


HIVE-16518: Insert override for druid does not replace all existing segments (Nishant Bangarwa, reviewed by Jesus Camacho Rodriguez)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/53b70bdc
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/53b70bdc
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/53b70bdc

Branch: refs/heads/master
Commit: 53b70bdc376e0b285677c0aad67836ecc901438c
Parents: 0a235d8
Author: Nishant Bangarwa <ni...@gmail.com>
Authored: Tue May 2 09:25:51 2017 +0100
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Tue May 2 09:25:51 2017 +0100

----------------------------------------------------------------------
 .../hadoop/hive/druid/DruidStorageHandler.java  | 27 ++++---
 .../hive/druid/DruidStorageHandlerUtils.java    | 72 ++++++++++++++++--
 .../hive/druid/TestDruidStorageHandler.java     | 78 ++++++++++++++++++--
 3 files changed, 152 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/53b70bdc/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
index d4f6865..daee2fe 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
@@ -33,7 +33,6 @@ import com.metamx.common.lifecycle.Lifecycle;
 import com.metamx.http.client.HttpClient;
 import com.metamx.http.client.HttpClientConfig;
 import com.metamx.http.client.HttpClientInit;
-import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler;
 import io.druid.metadata.MetadataStorageConnectorConfig;
 import io.druid.metadata.MetadataStorageTablesConfig;
 import io.druid.metadata.SQLMetadataConnector;
@@ -56,7 +55,6 @@ import org.apache.hadoop.hive.metastore.HiveMetaHook;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -96,8 +94,6 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
 
   private final SQLMetadataConnector connector;
 
-  private final SQLMetadataStorageUpdaterJobHandler druidSqlMetadataStorageUpdaterJobHandler;
-
   private final MetadataStorageTablesConfig druidMetadataStorageTablesConfig;
 
   private HttpClient httpClient;
@@ -151,17 +147,14 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
     } else {
       throw new IllegalStateException(String.format("Unknown metadata storage type [%s]", dbType));
     }
-    druidSqlMetadataStorageUpdaterJobHandler = new SQLMetadataStorageUpdaterJobHandler(connector);
   }
 
   @VisibleForTesting
   public DruidStorageHandler(SQLMetadataConnector connector,
-          SQLMetadataStorageUpdaterJobHandler druidSqlMetadataStorageUpdaterJobHandler,
           MetadataStorageTablesConfig druidMetadataStorageTablesConfig,
           HttpClient httpClient
   ) {
     this.connector = connector;
-    this.druidSqlMetadataStorageUpdaterJobHandler = druidSqlMetadataStorageUpdaterJobHandler;
     this.druidMetadataStorageTablesConfig = druidMetadataStorageTablesConfig;
     this.httpClient = httpClient;
   }
@@ -256,6 +249,12 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
 
   @Override
   public void commitCreateTable(Table table) throws MetaException {
+    LOG.debug(String.format("commit create table [%s]", table.getTableName()));
+    publishSegments(table, true);
+  }
+
+
+  public void publishSegments(Table table, boolean overwrite) throws MetaException {
     if (MetaStoreUtils.isExternalTable(table)) {
       return;
     }
@@ -266,15 +265,19 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
       List<DataSegment> segmentList = DruidStorageHandlerUtils
               .getPublishedSegments(tableDir, getConf());
       LOG.info(String.format("Found [%d] segments under path [%s]", segmentList.size(), tableDir));
-      druidSqlMetadataStorageUpdaterJobHandler.publishSegments(
-              druidMetadataStorageTablesConfig.getSegmentsTable(),
+      final String dataSourceName = table.getParameters().get(Constants.DRUID_DATA_SOURCE);
+
+      DruidStorageHandlerUtils.publishSegments(
+              connector,
+              druidMetadataStorageTablesConfig,
+              dataSourceName,
               segmentList,
-              DruidStorageHandlerUtils.JSON_MAPPER
+              DruidStorageHandlerUtils.JSON_MAPPER,
+              overwrite
       );
       final String coordinatorAddress = HiveConf
               .getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_COORDINATOR_DEFAULT_ADDRESS);
       int maxTries = HiveConf.getIntVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_MAX_TRIES);
-      final String dataSourceName = table.getParameters().get(Constants.DRUID_DATA_SOURCE);
       LOG.info(String.format("checking load status from coordinator [%s]", coordinatorAddress));
 
       // check if the coordinator is up
@@ -488,7 +491,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
   public void commitInsertTable(Table table, boolean overwrite) throws MetaException {
     if (overwrite) {
       LOG.debug(String.format("commit insert overwrite into table [%s]", table.getTableName()));
-      this.commitCreateTable(table);
+      this.publishSegments(table, overwrite);
     } else {
       throw new MetaException("Insert into is not supported yet");
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/53b70bdc/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
index 8d48e14..adf013b 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.jsontype.NamedType;
 import com.fasterxml.jackson.dataformat.smile.SmileFactory;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Interner;
 import com.google.common.collect.Interners;
 import com.google.common.collect.Lists;
@@ -43,6 +44,8 @@ import io.druid.segment.IndexMergerV9;
 import io.druid.segment.column.ColumnConfig;
 import io.druid.timeline.DataSegment;
 import io.druid.timeline.partition.LinearShardSpec;
+import io.druid.timeline.partition.NoneShardSpec;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -54,9 +57,11 @@ import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.util.StringUtils;
 import org.jboss.netty.handler.codec.http.HttpHeaders;
 import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.DateTime;
 import org.skife.jdbi.v2.FoldController;
 import org.skife.jdbi.v2.Folder3;
 import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.PreparedBatch;
 import org.skife.jdbi.v2.StatementContext;
 import org.skife.jdbi.v2.TransactionCallback;
 import org.skife.jdbi.v2.TransactionStatus;
@@ -335,14 +340,7 @@ public final class DruidStorageHandlerUtils {
               new HandleCallback<Void>() {
                 @Override
                 public Void withHandle(Handle handle) throws Exception {
-                  handle.createStatement(
-                          String.format("UPDATE %s SET used=false WHERE dataSource = :dataSource",
-                                  metadataStorageTablesConfig.getSegmentsTable()
-                          )
-                  )
-                          .bind("dataSource", dataSource)
-                          .execute();
-
+                  disableDataSourceWithHandle(handle, metadataStorageTablesConfig, dataSource);
                   return null;
                 }
               }
@@ -355,6 +353,64 @@ public final class DruidStorageHandlerUtils {
     return true;
   }
 
+  public static void publishSegments(final SQLMetadataConnector connector,
+      final MetadataStorageTablesConfig metadataStorageTablesConfig,
+      final String dataSource,
+      final List<DataSegment> segments, final ObjectMapper mapper, boolean overwrite)
+  {
+    connector.getDBI().inTransaction(
+        new TransactionCallback<Void>()
+        {
+          @Override
+          public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception
+          {
+            if(overwrite){
+              disableDataSourceWithHandle(handle, metadataStorageTablesConfig, dataSource);
+            }
+            final PreparedBatch batch = handle.prepareBatch(
+                String.format(
+                    "INSERT INTO %1$s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) "
+                        + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
+                    metadataStorageTablesConfig.getSegmentsTable()
+                )
+            );
+            for (final DataSegment segment : segments) {
+
+              batch.add(
+                  new ImmutableMap.Builder<String, Object>()
+                      .put("id", segment.getIdentifier())
+                      .put("dataSource", segment.getDataSource())
+                      .put("created_date", new DateTime().toString())
+                      .put("start", segment.getInterval().getStart().toString())
+                      .put("end", segment.getInterval().getEnd().toString())
+                      .put("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true)
+                      .put("version", segment.getVersion())
+                      .put("used", true)
+                      .put("payload", mapper.writeValueAsBytes(segment))
+                      .build()
+              );
+
+              LOG.info("Published %s", segment.getIdentifier());
+
+            }
+            batch.execute();
+
+            return null;
+          }
+        }
+    );
+  }
+
+  public static void disableDataSourceWithHandle(Handle handle, MetadataStorageTablesConfig metadataStorageTablesConfig, String dataSource){
+    handle.createStatement(
+        String.format("UPDATE %s SET used=false WHERE dataSource = :dataSource",
+            metadataStorageTablesConfig.getSegmentsTable()
+        )
+    )
+        .bind("dataSource", dataSource)
+        .execute();
+  }
+
   /**
    * @param connector                   SQL connector to metadata
    * @param metadataStorageTablesConfig Tables configuration

http://git-wip-us.apache.org/repos/asf/hive/blob/53b70bdc/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
index da6610a..05e3ec5 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
@@ -18,10 +18,13 @@
 
 package org.apache.hadoop.hive.druid;
 
+import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import io.druid.indexer.JobHelper;
 import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler;
+import io.druid.metadata.MetadataStorageTablesConfig;
+import io.druid.metadata.SQLMetadataSegmentManager;
 import io.druid.segment.loading.SegmentLoadingException;
 import io.druid.timeline.DataSegment;
 import io.druid.timeline.partition.NoneShardSpec;
@@ -42,10 +45,16 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.mockito.Mockito;
 import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.StatementContext;
+import org.skife.jdbi.v2.tweak.HandleCallback;
+import org.skife.jdbi.v2.tweak.ResultSetMapper;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
@@ -85,7 +94,6 @@ public class TestDruidStorageHandler {
   public void testPreCreateTableWillCreateSegmentsTable() throws MetaException {
     DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
             derbyConnectorRule.getConnector(),
-            new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
             derbyConnectorRule.metadataTablesConfigSupplier().get(),
             null
     );
@@ -114,7 +122,6 @@ public class TestDruidStorageHandler {
     );
     DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
             derbyConnectorRule.getConnector(),
-            new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
             derbyConnectorRule.metadataTablesConfigSupplier().get(),
             null
     );
@@ -126,7 +133,6 @@ public class TestDruidStorageHandler {
           throws MetaException, IOException {
     DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
             derbyConnectorRule.getConnector(),
-            new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
             derbyConnectorRule.metadataTablesConfigSupplier().get(),
             null
     );
@@ -158,7 +164,6 @@ public class TestDruidStorageHandler {
   public void testCommitInsertTable() throws MetaException, IOException {
     DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
             derbyConnectorRule.getConnector(),
-            new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
             derbyConnectorRule.metadataTablesConfigSupplier().get(),
             null
     );
@@ -184,7 +189,6 @@ public class TestDruidStorageHandler {
   public void testDeleteSegment() throws IOException, SegmentLoadingException {
     DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
             derbyConnectorRule.getConnector(),
-            new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
             derbyConnectorRule.metadataTablesConfigSupplier().get(),
             null
     );
@@ -221,4 +225,68 @@ public class TestDruidStorageHandler {
             localFileSystem.exists(segmentOutputPath.getParent().getParent().getParent())
     );
   }
+
+  @Test
+  public void testCommitInsertOverwriteTable() throws MetaException, IOException {
+    DerbyConnectorTestUtility connector = derbyConnectorRule.getConnector();
+    MetadataStorageTablesConfig metadataStorageTablesConfig = derbyConnectorRule
+        .metadataTablesConfigSupplier().get();
+
+    DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
+        connector,
+        metadataStorageTablesConfig,
+        null
+    );
+    druidStorageHandler.preCreateTable(tableMock);
+    Configuration config = new Configuration();
+    config.set(String.valueOf(HiveConf.ConfVars.HIVEQUERYID), UUID.randomUUID().toString());
+    config.set(String.valueOf(HiveConf.ConfVars.DRUID_WORKING_DIR), tableWorkingPath);
+    druidStorageHandler.setConf(config);
+    LocalFileSystem localFileSystem = FileSystem.getLocal(config);
+    Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName());
+    Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment,
+        new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME)
+    );
+    List<DataSegment> existingSegments = Arrays.asList(DataSegment.builder().dataSource(DATA_SOURCE_NAME).version("v0")
+        .interval(new Interval(1, 10)).shardSpec(NoneShardSpec.instance()).build());
+    DruidStorageHandlerUtils.publishSegments(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME,
+        existingSegments,
+        DruidStorageHandlerUtils.JSON_MAPPER,
+        true
+        );
+    DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, dataSegment, descriptorPath);
+    druidStorageHandler.commitInsertTable(tableMock, true);
+    Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), Lists.newArrayList(
+        DruidStorageHandlerUtils.getAllDataSourceNames(connector,
+            metadataStorageTablesConfig
+        )).toArray());
+
+    final List<DataSegment> dataSegmentList = connector.getDBI()
+        .withHandle(new HandleCallback<List<DataSegment>>() {
+          @Override
+          public List<DataSegment> withHandle(Handle handle) throws Exception {
+            return handle
+                .createQuery(String.format("SELECT payload FROM %s WHERE used=true",
+                    metadataStorageTablesConfig.getSegmentsTable()))
+                .map(new ResultSetMapper<DataSegment>() {
+
+                  @Override
+                  public DataSegment map(int i, ResultSet resultSet,
+                      StatementContext statementContext)
+                      throws SQLException {
+                    try {
+                      return DruidStorageHandlerUtils.JSON_MAPPER.readValue(
+                          resultSet.getBytes("payload"),
+                          DataSegment.class
+                      );
+                    } catch (IOException e) {
+                      throw Throwables.propagate(e);
+                    }
+                  }
+                }).list();
+          }
+        });
+    Assert.assertEquals(1, dataSegmentList.size());
+
+  }
 }