You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2018/04/14 17:07:37 UTC
hive git commit: HIVE-19175 : Assert that Insert into Druid Table
fails if the publishing of metadata by HS2 fails (Slim Bouguerra via Ashutosh
Chauhan)
Repository: hive
Updated Branches:
refs/heads/master 86b678f50 -> 1eea5a80d
HIVE-19175 : Assert that Insert into Druid Table fails if the publishing of metadata by HS2 fails (Slim Bouguerra via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1eea5a80
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1eea5a80
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1eea5a80
Branch: refs/heads/master
Commit: 1eea5a80ded2df33d57b2296b3bed98cb18383fd
Parents: 86b678f
Author: Slim Bouguerra <sl...@gmail.com>
Authored: Tue Apr 10 17:03:00 2018 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Sat Apr 14 10:07:23 2018 -0700
----------------------------------------------------------------------
.../hadoop/hive/druid/DruidStorageHandler.java | 69 +++++++-----
.../hive/druid/DruidStorageHandlerUtils.java | 9 +-
.../hadoop/hive/druid/io/DruidRecordWriter.java | 2 +-
.../druid/DruidStorageHandlerUtilsTest.java | 32 ++++++
.../clientpositive/druidmini_test_insert.q | 27 ++++-
.../druid/druidmini_test_insert.q.out | 109 +++++++++++++++++++
6 files changed, 208 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/1eea5a80/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
index c0feb8d..33387b2 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
@@ -26,7 +26,6 @@ import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.common.RetryUtils;
import com.metamx.common.lifecycle.Lifecycle;
@@ -118,6 +117,8 @@ import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
+import static org.apache.hadoop.hive.druid.DruidStorageHandlerUtils.JSON_MAPPER;
+
/**
* DruidStorageHandler provides a HiveStorageHandler implementation for Druid.
*/
@@ -329,7 +330,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
null
), "UTF-8");
- Map<String, Object> inputParser = DruidStorageHandlerUtils.JSON_MAPPER
+ Map<String, Object> inputParser = JSON_MAPPER
.convertValue(inputRowParser, Map.class);
final DataSchema dataSchema = new DataSchema(
dataSourceName,
@@ -414,7 +415,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
private static void updateKafkaIngestionSpec(String overlordAddress, KafkaSupervisorSpec spec) {
try {
- String task = DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(spec);
+ String task = JSON_MAPPER.writeValueAsString(spec);
console.printInfo("submitting kafka Spec {}", task);
LOG.info("submitting kafka Supervisor Spec {}", task);
@@ -422,7 +423,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
new URL(String.format("http://%s/druid/indexer/v1/supervisor", overlordAddress)))
.setContent(
"application/json",
- DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsBytes(spec)),
+ JSON_MAPPER.writeValueAsBytes(spec)),
new StatusResponseHandler(
Charset.forName("UTF-8"))).get();
if (response.getStatus().equals(HttpResponseStatus.OK)) {
@@ -504,7 +505,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
input -> input instanceof IOException,
getMaxRetryCount());
if (response.getStatus().equals(HttpResponseStatus.OK)) {
- return DruidStorageHandlerUtils.JSON_MAPPER
+ return JSON_MAPPER
.readValue(response.getContent(), KafkaSupervisorSpec.class);
// Druid Returns 400 Bad Request when not found.
} else if (response.getStatus().equals(HttpResponseStatus.NOT_FOUND) || response.getStatus().equals(HttpResponseStatus.BAD_REQUEST)) {
@@ -522,38 +523,47 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
protected void loadDruidSegments(Table table, boolean overwrite) throws MetaException {
- // at this point we have Druid segments from reducers but we need to atomically
- // rename and commit to metadata
+
final String dataSourceName = table.getParameters().get(Constants.DRUID_DATA_SOURCE);
- final List<DataSegment> segmentList = Lists.newArrayList();
- final Path tableDir = getSegmentDescriptorDir();
- // Read the created segments metadata from the table staging directory
+ final Path segmentDescriptorDir = getSegmentDescriptorDir();
try {
- segmentList.addAll(DruidStorageHandlerUtils.getCreatedSegments(tableDir, getConf()));
+ if (!segmentDescriptorDir.getFileSystem(getConf()).exists(segmentDescriptorDir)) {
+ LOG.info(
+ "Directory {} does not exist, ignore this if it is create statement or inserts of 0 rows,"
+ + " no Druid segments to move, cleaning working directory {}",
+ segmentDescriptorDir.getName(), getStagingWorkingDir().getName()
+ );
+ cleanWorkingDir();
+ return;
+ }
} catch (IOException e) {
- LOG.error("Failed to load segments descriptor from directory {}", tableDir.toString());
- Throwables.propagate(e);
+ LOG.error("Failed to load segments descriptor from directory {}", segmentDescriptorDir.toString());
cleanWorkingDir();
+ Throwables.propagate(e);
}
- // Moving Druid segments and committing to druid metadata as one transaction.
- final HdfsDataSegmentPusherConfig hdfsSegmentPusherConfig = new HdfsDataSegmentPusherConfig();
- List<DataSegment> publishedDataSegmentList = Lists.newArrayList();
- final String segmentDirectory =
- table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY) != null
- ? table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY)
- : HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY);
- LOG.info(String.format(
- "Moving [%s] Druid segments from staging directory [%s] to Deep storage [%s]",
- segmentList.size(),
- getStagingWorkingDir(),
- segmentDirectory
- ));
- hdfsSegmentPusherConfig.setStorageDirectory(segmentDirectory);
try {
+ // at this point we have Druid segments from reducers but we need to atomically
+ // rename and commit to metadata
+ // Moving Druid segments and committing to druid metadata as one transaction.
+ List<DataSegment> segmentList = DruidStorageHandlerUtils.getCreatedSegments(segmentDescriptorDir, getConf());
+ final HdfsDataSegmentPusherConfig hdfsSegmentPusherConfig = new HdfsDataSegmentPusherConfig();
+ List<DataSegment> publishedDataSegmentList;
+ final String segmentDirectory =
+ table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY) != null
+ ? table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY)
+ : HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY);
+ LOG.info(String.format(
+ "Moving [%s] Druid segments from staging directory [%s] to Deep storage [%s]",
+ segmentList.size(),
+ getStagingWorkingDir(),
+ segmentDirectory
+
+ ));
+ hdfsSegmentPusherConfig.setStorageDirectory(segmentDirectory);
DataSegmentPusher dataSegmentPusher = new HdfsDataSegmentPusher(hdfsSegmentPusherConfig,
getConf(),
- DruidStorageHandlerUtils.JSON_MAPPER
+ JSON_MAPPER
);
publishedDataSegmentList = DruidStorageHandlerUtils.publishSegmentsAndCommit(
getConnector(),
@@ -564,7 +574,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
getConf(),
dataSegmentPusher
);
-
+ checkLoadStatus(publishedDataSegmentList);
} catch (CallbackFailedException | IOException e) {
LOG.error("Failed to move segments from staging directory");
if (e instanceof CallbackFailedException) {
@@ -574,7 +584,6 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
} finally {
cleanWorkingDir();
}
- checkLoadStatus(publishedDataSegmentList);
}
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/1eea5a80/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
index 1aef565..808351d 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
@@ -299,14 +299,7 @@ public final class DruidStorageHandlerUtils {
ImmutableList.Builder<DataSegment> publishedSegmentsBuilder = ImmutableList.builder();
FileSystem fs = taskDir.getFileSystem(conf);
FileStatus[] fss;
- try {
- fss = fs.listStatus(taskDir);
- } catch (FileNotFoundException e) {
- // This is a CREATE TABLE statement or query executed for CTAS/INSERT
- // did not produce any result. We do not need to do anything, this is
- // expected behavior.
- return publishedSegmentsBuilder.build();
- }
+ fss = fs.listStatus(taskDir);
for (FileStatus fileStatus : fss) {
final DataSegment segment = JSON_MAPPER
.readValue((InputStream) fs.open(fileStatus.getPath()), DataSegment.class);
http://git-wip-us.apache.org/repos/asf/hive/blob/1eea5a80/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
index 8ab34a8..d1f0d98 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
@@ -325,7 +325,7 @@ public class DruidRecordWriter implements RecordWriter<NullWritable, DruidWritab
@Override
public void close(Reporter reporter) throws IOException {
- this.close(true);
+ this.close(false);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1eea5a80/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerUtilsTest.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerUtilsTest.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerUtilsTest.java
new file mode 100644
index 0000000..d079e4f
--- /dev/null
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerUtilsTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.druid;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class DruidStorageHandlerUtilsTest {
+
+ @Test public void testCreateSelectStarQuery() throws IOException {
+ Assert.assertTrue("this should not be null",
+ DruidStorageHandlerUtils.createSelectStarQuery("dummy_ds").contains("dummy_ds"));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/1eea5a80/ql/src/test/queries/clientpositive/druidmini_test_insert.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/druidmini_test_insert.q b/ql/src/test/queries/clientpositive/druidmini_test_insert.q
index f4cb666..c14a1b6 100644
--- a/ql/src/test/queries/clientpositive/druidmini_test_insert.q
+++ b/ql/src/test/queries/clientpositive/druidmini_test_insert.q
@@ -53,7 +53,30 @@ SELECT COUNT(*) FROM druid_alltypesorc;
DROP TABLE druid_alltypesorc;
-
+ -- Test create then insert
+
+ create database druid_test_create_then_insert;
+ use druid_test_create_then_insert;
+
+ create table test_table(`timecolumn` timestamp, `userid` string, `num_l` float);
+
+ insert into test_table values ('2015-01-08 00:00:00', 'i1-start', 4);
+ insert into test_table values ('2015-01-08 23:59:59', 'i1-end', 1);
+
+ CREATE TABLE druid_table (`__time` timestamp with local time zone, `userid` string, `num_l` float)
+ STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
+ TBLPROPERTIES ("druid.segment.granularity" = "DAY");
+
+
+ INSERT INTO TABLE druid_table
+ select cast(`timecolumn` as timestamp with local time zone) as `__time`, `userid`, `num_l` FROM test_table;
+
+ select count(*) FROM druid_table;
+
+ DROP TABLE test_table;
+ DROP TABLE druid_table;
+ DROP DATABASE druid_test_create_then_insert;
+
-- Day light saving time test insert into test
create database druid_test_dst;
@@ -116,3 +139,5 @@ EXPLAIN select * from druid_test_table where `__time` = cast('2015-03-10 23:59:5
DROP TABLE test_base_table;
DROP TABLE druid_test_table;
+
+drop database druid_test_dst;
http://git-wip-us.apache.org/repos/asf/hive/blob/1eea5a80/ql/src/test/results/clientpositive/druid/druidmini_test_insert.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/druid/druidmini_test_insert.q.out b/ql/src/test/results/clientpositive/druid/druidmini_test_insert.q.out
index 482554b..c471795 100644
--- a/ql/src/test/results/clientpositive/druid/druidmini_test_insert.q.out
+++ b/ql/src/test/results/clientpositive/druid/druidmini_test_insert.q.out
@@ -148,6 +148,107 @@ POSTHOOK: query: DROP TABLE druid_alltypesorc
POSTHOOK: type: DROPTABLE
POSTHOOK: Input: default@druid_alltypesorc
POSTHOOK: Output: default@druid_alltypesorc
+PREHOOK: query: -- Test create then insert
+
+ create database druid_test_create_then_insert
+PREHOOK: type: CREATEDATABASE
+PREHOOK: Output: database:druid_test_create_then_insert
+POSTHOOK: query: -- Test create then insert
+
+ create database druid_test_create_then_insert
+POSTHOOK: type: CREATEDATABASE
+POSTHOOK: Output: database:druid_test_create_then_insert
+PREHOOK: query: use druid_test_create_then_insert
+PREHOOK: type: SWITCHDATABASE
+PREHOOK: Input: database:druid_test_create_then_insert
+POSTHOOK: query: use druid_test_create_then_insert
+POSTHOOK: type: SWITCHDATABASE
+POSTHOOK: Input: database:druid_test_create_then_insert
+PREHOOK: query: create table test_table(`timecolumn` timestamp, `userid` string, `num_l` float)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:druid_test_create_then_insert
+PREHOOK: Output: druid_test_create_then_insert@test_table
+POSTHOOK: query: create table test_table(`timecolumn` timestamp, `userid` string, `num_l` float)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:druid_test_create_then_insert
+POSTHOOK: Output: druid_test_create_then_insert@test_table
+PREHOOK: query: insert into test_table values ('2015-01-08 00:00:00', 'i1-start', 4)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: druid_test_create_then_insert@test_table
+POSTHOOK: query: insert into test_table values ('2015-01-08 00:00:00', 'i1-start', 4)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: druid_test_create_then_insert@test_table
+POSTHOOK: Lineage: test_table.num_l SCRIPT []
+POSTHOOK: Lineage: test_table.timecolumn SCRIPT []
+POSTHOOK: Lineage: test_table.userid SCRIPT []
+PREHOOK: query: insert into test_table values ('2015-01-08 23:59:59', 'i1-end', 1)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: druid_test_create_then_insert@test_table
+POSTHOOK: query: insert into test_table values ('2015-01-08 23:59:59', 'i1-end', 1)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: druid_test_create_then_insert@test_table
+POSTHOOK: Lineage: test_table.num_l SCRIPT []
+POSTHOOK: Lineage: test_table.timecolumn SCRIPT []
+POSTHOOK: Lineage: test_table.userid SCRIPT []
+PREHOOK: query: CREATE TABLE druid_table (`__time` timestamp with local time zone, `userid` string, `num_l` float)
+ STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
+ TBLPROPERTIES ("druid.segment.granularity" = "DAY")
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:druid_test_create_then_insert
+PREHOOK: Output: druid_test_create_then_insert@druid_table
+POSTHOOK: query: CREATE TABLE druid_table (`__time` timestamp with local time zone, `userid` string, `num_l` float)
+ STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
+ TBLPROPERTIES ("druid.segment.granularity" = "DAY")
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:druid_test_create_then_insert
+POSTHOOK: Output: druid_test_create_then_insert@druid_table
+PREHOOK: query: INSERT INTO TABLE druid_table
+ select cast(`timecolumn` as timestamp with local time zone) as `__time`, `userid`, `num_l` FROM test_table
+PREHOOK: type: QUERY
+PREHOOK: Input: druid_test_create_then_insert@test_table
+PREHOOK: Output: druid_test_create_then_insert@druid_table
+POSTHOOK: query: INSERT INTO TABLE druid_table
+ select cast(`timecolumn` as timestamp with local time zone) as `__time`, `userid`, `num_l` FROM test_table
+POSTHOOK: type: QUERY
+POSTHOOK: Input: druid_test_create_then_insert@test_table
+POSTHOOK: Output: druid_test_create_then_insert@druid_table
+PREHOOK: query: select count(*) FROM druid_table
+PREHOOK: type: QUERY
+PREHOOK: Input: druid_test_create_then_insert@druid_table
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select count(*) FROM druid_table
+POSTHOOK: type: QUERY
+POSTHOOK: Input: druid_test_create_then_insert@druid_table
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+2
+PREHOOK: query: DROP TABLE test_table
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: druid_test_create_then_insert@test_table
+PREHOOK: Output: druid_test_create_then_insert@test_table
+POSTHOOK: query: DROP TABLE test_table
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: druid_test_create_then_insert@test_table
+POSTHOOK: Output: druid_test_create_then_insert@test_table
+PREHOOK: query: DROP TABLE druid_table
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: druid_test_create_then_insert@druid_table
+PREHOOK: Output: druid_test_create_then_insert@druid_table
+POSTHOOK: query: DROP TABLE druid_table
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: druid_test_create_then_insert@druid_table
+POSTHOOK: Output: druid_test_create_then_insert@druid_table
+PREHOOK: query: DROP DATABASE druid_test_create_then_insert
+PREHOOK: type: DROPDATABASE
+PREHOOK: Input: database:druid_test_create_then_insert
+PREHOOK: Output: database:druid_test_create_then_insert
+POSTHOOK: query: DROP DATABASE druid_test_create_then_insert
+POSTHOOK: type: DROPDATABASE
+POSTHOOK: Input: database:druid_test_create_then_insert
+POSTHOOK: Output: database:druid_test_create_then_insert
PREHOOK: query: create database druid_test_dst
PREHOOK: type: CREATEDATABASE
PREHOOK: Output: database:druid_test_dst
@@ -681,3 +782,11 @@ POSTHOOK: query: DROP TABLE druid_test_table
POSTHOOK: type: DROPTABLE
POSTHOOK: Input: druid_test_dst@druid_test_table
POSTHOOK: Output: druid_test_dst@druid_test_table
+PREHOOK: query: drop database druid_test_dst
+PREHOOK: type: DROPDATABASE
+PREHOOK: Input: database:druid_test_dst
+PREHOOK: Output: database:druid_test_dst
+POSTHOOK: query: drop database druid_test_dst
+POSTHOOK: type: DROPDATABASE
+POSTHOOK: Input: database:druid_test_dst
+POSTHOOK: Output: database:druid_test_dst