You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2017/05/02 08:25:54 UTC
hive git commit: HIVE-16518: Insert override for druid does not
replace all existing segments (Nishant Bangarwa,
reviewed by Jesus Camacho Rodriguez)
Repository: hive
Updated Branches:
refs/heads/master 0a235d8e1 -> 53b70bdc3
HIVE-16518: Insert override for druid does not replace all existing segments (Nishant Bangarwa, reviewed by Jesus Camacho Rodriguez)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/53b70bdc
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/53b70bdc
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/53b70bdc
Branch: refs/heads/master
Commit: 53b70bdc376e0b285677c0aad67836ecc901438c
Parents: 0a235d8
Author: Nishant Bangarwa <ni...@gmail.com>
Authored: Tue May 2 09:25:51 2017 +0100
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Tue May 2 09:25:51 2017 +0100
----------------------------------------------------------------------
.../hadoop/hive/druid/DruidStorageHandler.java | 27 ++++---
.../hive/druid/DruidStorageHandlerUtils.java | 72 ++++++++++++++++--
.../hive/druid/TestDruidStorageHandler.java | 78 ++++++++++++++++++--
3 files changed, 152 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/53b70bdc/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
index d4f6865..daee2fe 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
@@ -33,7 +33,6 @@ import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.HttpClientConfig;
import com.metamx.http.client.HttpClientInit;
-import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler;
import io.druid.metadata.MetadataStorageConnectorConfig;
import io.druid.metadata.MetadataStorageTablesConfig;
import io.druid.metadata.SQLMetadataConnector;
@@ -56,7 +55,6 @@ import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -96,8 +94,6 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
private final SQLMetadataConnector connector;
- private final SQLMetadataStorageUpdaterJobHandler druidSqlMetadataStorageUpdaterJobHandler;
-
private final MetadataStorageTablesConfig druidMetadataStorageTablesConfig;
private HttpClient httpClient;
@@ -151,17 +147,14 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
} else {
throw new IllegalStateException(String.format("Unknown metadata storage type [%s]", dbType));
}
- druidSqlMetadataStorageUpdaterJobHandler = new SQLMetadataStorageUpdaterJobHandler(connector);
}
@VisibleForTesting
public DruidStorageHandler(SQLMetadataConnector connector,
- SQLMetadataStorageUpdaterJobHandler druidSqlMetadataStorageUpdaterJobHandler,
MetadataStorageTablesConfig druidMetadataStorageTablesConfig,
HttpClient httpClient
) {
this.connector = connector;
- this.druidSqlMetadataStorageUpdaterJobHandler = druidSqlMetadataStorageUpdaterJobHandler;
this.druidMetadataStorageTablesConfig = druidMetadataStorageTablesConfig;
this.httpClient = httpClient;
}
@@ -256,6 +249,12 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
@Override
public void commitCreateTable(Table table) throws MetaException {
+ LOG.debug(String.format("commit create table [%s]", table.getTableName()));
+ publishSegments(table, true);
+ }
+
+
+ public void publishSegments(Table table, boolean overwrite) throws MetaException {
if (MetaStoreUtils.isExternalTable(table)) {
return;
}
@@ -266,15 +265,19 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
List<DataSegment> segmentList = DruidStorageHandlerUtils
.getPublishedSegments(tableDir, getConf());
LOG.info(String.format("Found [%d] segments under path [%s]", segmentList.size(), tableDir));
- druidSqlMetadataStorageUpdaterJobHandler.publishSegments(
- druidMetadataStorageTablesConfig.getSegmentsTable(),
+ final String dataSourceName = table.getParameters().get(Constants.DRUID_DATA_SOURCE);
+
+ DruidStorageHandlerUtils.publishSegments(
+ connector,
+ druidMetadataStorageTablesConfig,
+ dataSourceName,
segmentList,
- DruidStorageHandlerUtils.JSON_MAPPER
+ DruidStorageHandlerUtils.JSON_MAPPER,
+ overwrite
);
final String coordinatorAddress = HiveConf
.getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_COORDINATOR_DEFAULT_ADDRESS);
int maxTries = HiveConf.getIntVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_MAX_TRIES);
- final String dataSourceName = table.getParameters().get(Constants.DRUID_DATA_SOURCE);
LOG.info(String.format("checking load status from coordinator [%s]", coordinatorAddress));
// check if the coordinator is up
@@ -488,7 +491,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
public void commitInsertTable(Table table, boolean overwrite) throws MetaException {
if (overwrite) {
LOG.debug(String.format("commit insert overwrite into table [%s]", table.getTableName()));
- this.commitCreateTable(table);
+ this.publishSegments(table, overwrite);
} else {
throw new MetaException("Insert into is not supported yet");
}
http://git-wip-us.apache.org/repos/asf/hive/blob/53b70bdc/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
index 8d48e14..adf013b 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Interner;
import com.google.common.collect.Interners;
import com.google.common.collect.Lists;
@@ -43,6 +44,8 @@ import io.druid.segment.IndexMergerV9;
import io.druid.segment.column.ColumnConfig;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.LinearShardSpec;
+import io.druid.timeline.partition.NoneShardSpec;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -54,9 +57,11 @@ import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.util.StringUtils;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.DateTime;
import org.skife.jdbi.v2.FoldController;
import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.PreparedBatch;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus;
@@ -335,14 +340,7 @@ public final class DruidStorageHandlerUtils {
new HandleCallback<Void>() {
@Override
public Void withHandle(Handle handle) throws Exception {
- handle.createStatement(
- String.format("UPDATE %s SET used=false WHERE dataSource = :dataSource",
- metadataStorageTablesConfig.getSegmentsTable()
- )
- )
- .bind("dataSource", dataSource)
- .execute();
-
+ disableDataSourceWithHandle(handle, metadataStorageTablesConfig, dataSource);
return null;
}
}
@@ -355,6 +353,64 @@ public final class DruidStorageHandlerUtils {
return true;
}
+ public static void publishSegments(final SQLMetadataConnector connector,
+ final MetadataStorageTablesConfig metadataStorageTablesConfig,
+ final String dataSource,
+ final List<DataSegment> segments, final ObjectMapper mapper, boolean overwrite)
+ {
+ connector.getDBI().inTransaction(
+ new TransactionCallback<Void>()
+ {
+ @Override
+ public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception
+ {
+ if(overwrite){
+ disableDataSourceWithHandle(handle, metadataStorageTablesConfig, dataSource);
+ }
+ final PreparedBatch batch = handle.prepareBatch(
+ String.format(
+ "INSERT INTO %1$s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) "
+ + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
+ metadataStorageTablesConfig.getSegmentsTable()
+ )
+ );
+ for (final DataSegment segment : segments) {
+
+ batch.add(
+ new ImmutableMap.Builder<String, Object>()
+ .put("id", segment.getIdentifier())
+ .put("dataSource", segment.getDataSource())
+ .put("created_date", new DateTime().toString())
+ .put("start", segment.getInterval().getStart().toString())
+ .put("end", segment.getInterval().getEnd().toString())
+ .put("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true)
+ .put("version", segment.getVersion())
+ .put("used", true)
+ .put("payload", mapper.writeValueAsBytes(segment))
+ .build()
+ );
+
+ LOG.info("Published %s", segment.getIdentifier());
+
+ }
+ batch.execute();
+
+ return null;
+ }
+ }
+ );
+ }
+
+ public static void disableDataSourceWithHandle(Handle handle, MetadataStorageTablesConfig metadataStorageTablesConfig, String dataSource){
+ handle.createStatement(
+ String.format("UPDATE %s SET used=false WHERE dataSource = :dataSource",
+ metadataStorageTablesConfig.getSegmentsTable()
+ )
+ )
+ .bind("dataSource", dataSource)
+ .execute();
+ }
+
/**
* @param connector SQL connector to metadata
* @param metadataStorageTablesConfig Tables configuration
http://git-wip-us.apache.org/repos/asf/hive/blob/53b70bdc/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
index da6610a..05e3ec5 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
@@ -18,10 +18,13 @@
package org.apache.hadoop.hive.druid;
+import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.druid.indexer.JobHelper;
import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler;
+import io.druid.metadata.MetadataStorageTablesConfig;
+import io.druid.metadata.SQLMetadataSegmentManager;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
@@ -42,10 +45,16 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
import org.skife.jdbi.v2.Handle;
+import org.skife.jdbi.v2.StatementContext;
+import org.skife.jdbi.v2.tweak.HandleCallback;
+import org.skife.jdbi.v2.tweak.ResultSetMapper;
import java.io.IOException;
import java.io.OutputStream;
+import java.sql.ResultSet;
+import java.sql.SQLException;
import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -85,7 +94,6 @@ public class TestDruidStorageHandler {
public void testPreCreateTableWillCreateSegmentsTable() throws MetaException {
DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
derbyConnectorRule.getConnector(),
- new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
derbyConnectorRule.metadataTablesConfigSupplier().get(),
null
);
@@ -114,7 +122,6 @@ public class TestDruidStorageHandler {
);
DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
derbyConnectorRule.getConnector(),
- new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
derbyConnectorRule.metadataTablesConfigSupplier().get(),
null
);
@@ -126,7 +133,6 @@ public class TestDruidStorageHandler {
throws MetaException, IOException {
DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
derbyConnectorRule.getConnector(),
- new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
derbyConnectorRule.metadataTablesConfigSupplier().get(),
null
);
@@ -158,7 +164,6 @@ public class TestDruidStorageHandler {
public void testCommitInsertTable() throws MetaException, IOException {
DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
derbyConnectorRule.getConnector(),
- new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
derbyConnectorRule.metadataTablesConfigSupplier().get(),
null
);
@@ -184,7 +189,6 @@ public class TestDruidStorageHandler {
public void testDeleteSegment() throws IOException, SegmentLoadingException {
DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
derbyConnectorRule.getConnector(),
- new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
derbyConnectorRule.metadataTablesConfigSupplier().get(),
null
);
@@ -221,4 +225,68 @@ public class TestDruidStorageHandler {
localFileSystem.exists(segmentOutputPath.getParent().getParent().getParent())
);
}
+
+ @Test
+ public void testCommitInsertOverwriteTable() throws MetaException, IOException {
+ DerbyConnectorTestUtility connector = derbyConnectorRule.getConnector();
+ MetadataStorageTablesConfig metadataStorageTablesConfig = derbyConnectorRule
+ .metadataTablesConfigSupplier().get();
+
+ DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
+ connector,
+ metadataStorageTablesConfig,
+ null
+ );
+ druidStorageHandler.preCreateTable(tableMock);
+ Configuration config = new Configuration();
+ config.set(String.valueOf(HiveConf.ConfVars.HIVEQUERYID), UUID.randomUUID().toString());
+ config.set(String.valueOf(HiveConf.ConfVars.DRUID_WORKING_DIR), tableWorkingPath);
+ druidStorageHandler.setConf(config);
+ LocalFileSystem localFileSystem = FileSystem.getLocal(config);
+ Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName());
+ Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment,
+ new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME)
+ );
+ List<DataSegment> existingSegments = Arrays.asList(DataSegment.builder().dataSource(DATA_SOURCE_NAME).version("v0")
+ .interval(new Interval(1, 10)).shardSpec(NoneShardSpec.instance()).build());
+ DruidStorageHandlerUtils.publishSegments(connector, metadataStorageTablesConfig, DATA_SOURCE_NAME,
+ existingSegments,
+ DruidStorageHandlerUtils.JSON_MAPPER,
+ true
+ );
+ DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, dataSegment, descriptorPath);
+ druidStorageHandler.commitInsertTable(tableMock, true);
+ Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), Lists.newArrayList(
+ DruidStorageHandlerUtils.getAllDataSourceNames(connector,
+ metadataStorageTablesConfig
+ )).toArray());
+
+ final List<DataSegment> dataSegmentList = connector.getDBI()
+ .withHandle(new HandleCallback<List<DataSegment>>() {
+ @Override
+ public List<DataSegment> withHandle(Handle handle) throws Exception {
+ return handle
+ .createQuery(String.format("SELECT payload FROM %s WHERE used=true",
+ metadataStorageTablesConfig.getSegmentsTable()))
+ .map(new ResultSetMapper<DataSegment>() {
+
+ @Override
+ public DataSegment map(int i, ResultSet resultSet,
+ StatementContext statementContext)
+ throws SQLException {
+ try {
+ return DruidStorageHandlerUtils.JSON_MAPPER.readValue(
+ resultSet.getBytes("payload"),
+ DataSegment.class
+ );
+ } catch (IOException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+ }).list();
+ }
+ });
+ Assert.assertEquals(1, dataSegmentList.size());
+
+ }
}