You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2016/12/15 20:51:37 UTC
[3/4] hive git commit: HIVE-15277: Teach Hive how to create/delete
Druid segments (Slim Bouguerra, reviewed by Jesus Camacho Rodriguez)
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
new file mode 100644
index 0000000..7ac52c6
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
@@ -0,0 +1,397 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid.io;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.metamx.common.lifecycle.Lifecycle;
+import com.metamx.http.client.HttpClient;
+import com.metamx.http.client.HttpClientConfig;
+import com.metamx.http.client.HttpClientInit;
+import io.druid.query.Druids;
+import io.druid.query.Druids.SegmentMetadataQueryBuilder;
+import io.druid.query.Druids.SelectQueryBuilder;
+import io.druid.query.Druids.TimeBoundaryQueryBuilder;
+import io.druid.query.Query;
+import io.druid.query.Result;
+import io.druid.query.metadata.metadata.SegmentAnalysis;
+import io.druid.query.metadata.metadata.SegmentMetadataQuery;
+import io.druid.query.select.PagingSpec;
+import io.druid.query.select.SelectQuery;
+import io.druid.query.spec.MultipleIntervalSegmentSpec;
+import io.druid.query.timeboundary.TimeBoundaryQuery;
+import io.druid.query.timeboundary.TimeBoundaryResultValue;
+import org.apache.calcite.adapter.druid.DruidDateTimeUtils;
+import org.apache.calcite.adapter.druid.DruidTable;
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.druid.serde.DruidGroupByQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidSelectQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidTimeseriesQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidTopNQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidWritable;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.joda.time.chrono.ISOChronology;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Druid query based input format.
+ *
+ * Given a query and the Druid broker address, it will send it, and retrieve
+ * and parse the results.
+ */
+public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidWritable>
+ implements org.apache.hadoop.mapred.InputFormat<NullWritable, DruidWritable> {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(DruidQueryBasedInputFormat.class);
+
+ @Override
+ public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplits)
+ throws IOException {
+ return getInputSplits(job);
+ }
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
+ return Arrays.<InputSplit>asList(getInputSplits(context.getConfiguration()));
+ }
+
+ @SuppressWarnings("deprecation")
+ private HiveDruidSplit[] getInputSplits(Configuration conf) throws IOException {
+ String address = HiveConf.getVar(conf,
+ HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS
+ );
+ if (StringUtils.isEmpty(address)) {
+ throw new IOException("Druid broker address not specified in configuration");
+ }
+ String druidQuery = StringEscapeUtils.unescapeJava(conf.get(Constants.DRUID_QUERY_JSON));
+ String druidQueryType;
+ if (StringUtils.isEmpty(druidQuery)) {
+ // Empty, maybe because CBO did not run; we fall back to
+ // full Select query
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Druid query is empty; creating Select query");
+ }
+ String dataSource = conf.get(Constants.DRUID_DATA_SOURCE);
+ if (dataSource == null) {
+ throw new IOException("Druid data source cannot be empty");
+ }
+ druidQuery = createSelectStarQuery(dataSource);
+ druidQueryType = Query.SELECT;
+ } else {
+ druidQueryType = conf.get(Constants.DRUID_QUERY_TYPE);
+ if (druidQueryType == null) {
+ throw new IOException("Druid query type not recognized");
+ }
+ }
+
+ // hive depends on FileSplits
+ Job job = new Job(conf);
+ JobContext jobContext = ShimLoader.getHadoopShims().newJobContext(job);
+ Path[] paths = FileInputFormat.getInputPaths(jobContext);
+
+ switch (druidQueryType) {
+ case Query.TIMESERIES:
+ case Query.TOPN:
+ case Query.GROUP_BY:
+ return new HiveDruidSplit[] { new HiveDruidSplit(address, druidQuery, paths[0]) };
+ case Query.SELECT:
+ return splitSelectQuery(conf, address, druidQuery, paths[0]);
+ default:
+ throw new IOException("Druid query type not recognized");
+ }
+ }
+
+ private static String createSelectStarQuery(String dataSource) throws IOException {
+ // Create Select query
+ SelectQueryBuilder builder = new Druids.SelectQueryBuilder();
+ builder.dataSource(dataSource);
+ builder.intervals(Arrays.asList(DruidTable.DEFAULT_INTERVAL));
+ builder.pagingSpec(PagingSpec.newSpec(1));
+ Map<String, Object> context = new HashMap<>();
+ context.put(Constants.DRUID_QUERY_FETCH, false);
+ builder.context(context);
+ return DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(builder.build());
+ }
+
+ /* Method that splits Select query depending on the threshold so read can be
+ * parallelized */
+ private static HiveDruidSplit[] splitSelectQuery(Configuration conf, String address,
+ String druidQuery, Path dummyPath
+ ) throws IOException {
+ final int selectThreshold = (int) HiveConf.getIntVar(
+ conf, HiveConf.ConfVars.HIVE_DRUID_SELECT_THRESHOLD);
+ final int numConnection = HiveConf
+ .getIntVar(conf, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION);
+ final Period readTimeout = new Period(
+ HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT));
+ SelectQuery query;
+ try {
+ query = DruidStorageHandlerUtils.JSON_MAPPER.readValue(druidQuery, SelectQuery.class);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+
+ final boolean isFetch = query.getContextBoolean(Constants.DRUID_QUERY_FETCH, false);
+ if (isFetch) {
+ // If it has a limit, we use it and we do not split the query
+ return new HiveDruidSplit[] { new HiveDruidSplit(
+ address, DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath) };
+ }
+
+ // We do not have the number of rows, thus we need to execute a
+ // Segment Metadata query to obtain number of rows
+ SegmentMetadataQueryBuilder metadataBuilder = new Druids.SegmentMetadataQueryBuilder();
+ metadataBuilder.dataSource(query.getDataSource());
+ metadataBuilder.intervals(query.getIntervals());
+ metadataBuilder.merge(true);
+ metadataBuilder.analysisTypes();
+ SegmentMetadataQuery metadataQuery = metadataBuilder.build();
+ final Lifecycle lifecycle = new Lifecycle();
+ HttpClient client = HttpClientInit.createClient(
+ HttpClientConfig.builder().withNumConnections(numConnection)
+ .withReadTimeout(readTimeout.toStandardDuration()).build(), lifecycle);
+ try {
+ lifecycle.start();
+ } catch (Exception e) {
+ LOG.error("Lifecycle start issue", e);
+ }
+ InputStream response;
+ try {
+ response = DruidStorageHandlerUtils.submitRequest(client,
+ DruidStorageHandlerUtils.createRequest(address, metadataQuery)
+ );
+ } catch (Exception e) {
+ throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+ } finally {
+ lifecycle.stop();
+ }
+
+ // Retrieve results
+ List<SegmentAnalysis> metadataList;
+ try {
+ metadataList = DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response,
+ new TypeReference<List<SegmentAnalysis>>() {
+ }
+ );
+ } catch (Exception e) {
+ response.close();
+ throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+ }
+ if (metadataList == null || metadataList.isEmpty()) {
+ throw new IOException("Connected to Druid but could not retrieve datasource information");
+ }
+ if (metadataList.size() != 1) {
+ throw new IOException("Information about segments should have been merged");
+ }
+
+ final long numRows = metadataList.get(0).getNumRows();
+
+ query = query.withPagingSpec(PagingSpec.newSpec(Integer.MAX_VALUE));
+ if (numRows <= selectThreshold) {
+ // We are not going to split it
+ return new HiveDruidSplit[] { new HiveDruidSplit(address,
+ DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath
+ ) };
+ }
+
+ // If the query does not specify a timestamp, we obtain the total time using
+ // a Time Boundary query. Then, we use the information to split the query
+ // following the Select threshold configuration property
+ final List<Interval> intervals = new ArrayList<>();
+ if (query.getIntervals().size() == 1 && query.getIntervals().get(0).withChronology(
+ ISOChronology.getInstanceUTC()).equals(DruidTable.DEFAULT_INTERVAL)) {
+ // Default max and min, we should execute a time boundary query to get a
+ // more precise range
+ TimeBoundaryQueryBuilder timeBuilder = new Druids.TimeBoundaryQueryBuilder();
+ timeBuilder.dataSource(query.getDataSource());
+ TimeBoundaryQuery timeQuery = timeBuilder.build();
+
+ try {
+ response = DruidStorageHandlerUtils.submitRequest(client,
+ DruidStorageHandlerUtils.createRequest(address, timeQuery)
+ );
+ } catch (Exception e) {
+ throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+ }
+
+ // Retrieve results
+ List<Result<TimeBoundaryResultValue>> timeList;
+ try {
+ timeList = DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response,
+ new TypeReference<List<Result<TimeBoundaryResultValue>>>() {
+ }
+ );
+ } catch (Exception e) {
+ response.close();
+ throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+ }
+ if (timeList == null || timeList.isEmpty()) {
+ throw new IOException(
+ "Connected to Druid but could not retrieve time boundary information");
+ }
+ if (timeList.size() != 1) {
+ throw new IOException("We should obtain a single time boundary");
+ }
+
+ intervals.add(new Interval(timeList.get(0).getValue().getMinTime().getMillis(),
+ timeList.get(0).getValue().getMaxTime().getMillis(), ISOChronology.getInstanceUTC()
+ ));
+ } else {
+ intervals.addAll(query.getIntervals());
+ }
+
+ // Create (numRows/default threshold) input splits
+ int numSplits = (int) Math.ceil((double) numRows / selectThreshold);
+ List<List<Interval>> newIntervals = createSplitsIntervals(intervals, numSplits);
+ HiveDruidSplit[] splits = new HiveDruidSplit[numSplits];
+ for (int i = 0; i < numSplits; i++) {
+ // Create partial Select query
+ final SelectQuery partialQuery = query.withQuerySegmentSpec(
+ new MultipleIntervalSegmentSpec(newIntervals.get(i)));
+ splits[i] = new HiveDruidSplit(address,
+ DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(partialQuery), dummyPath
+ );
+ }
+ return splits;
+ }
+
+ private static List<List<Interval>> createSplitsIntervals(List<Interval> intervals, int numSplits
+ ) {
+ final long totalTime = DruidDateTimeUtils.extractTotalTime(intervals);
+ long startTime = intervals.get(0).getStartMillis();
+ long endTime = startTime;
+ long currTime = 0;
+ List<List<Interval>> newIntervals = new ArrayList<>();
+ for (int i = 0, posIntervals = 0; i < numSplits; i++) {
+ final long rangeSize = Math.round((double) (totalTime * (i + 1)) / numSplits) -
+ Math.round((double) (totalTime * i) / numSplits);
+ // Create the new interval(s)
+ List<Interval> currentIntervals = new ArrayList<>();
+ while (posIntervals < intervals.size()) {
+ final Interval interval = intervals.get(posIntervals);
+ final long expectedRange = rangeSize - currTime;
+ if (interval.getEndMillis() - startTime >= expectedRange) {
+ endTime = startTime + expectedRange;
+ currentIntervals.add(new Interval(startTime, endTime, ISOChronology.getInstanceUTC()));
+ startTime = endTime;
+ currTime = 0;
+ break;
+ }
+ endTime = interval.getEndMillis();
+ currentIntervals.add(new Interval(startTime, endTime, ISOChronology.getInstanceUTC()));
+ currTime += (endTime - startTime);
+ startTime = intervals.get(++posIntervals).getStartMillis();
+ }
+ newIntervals.add(currentIntervals);
+ }
+ assert endTime == intervals.get(intervals.size() - 1).getEndMillis();
+ return newIntervals;
+ }
+
+ @Override
+ public org.apache.hadoop.mapred.RecordReader<NullWritable, DruidWritable> getRecordReader(
+ org.apache.hadoop.mapred.InputSplit split, JobConf job, Reporter reporter
+ )
+ throws IOException {
+ // We need to provide a different record reader for every type of Druid query.
+ // The reason is that Druid results format is different for each type.
+ final DruidQueryRecordReader<?, ?> reader;
+ final String druidQueryType = job.get(Constants.DRUID_QUERY_TYPE);
+ if (druidQueryType == null) {
+ reader = new DruidSelectQueryRecordReader(); // By default
+ reader.initialize((HiveDruidSplit) split, job);
+ return reader;
+ }
+ switch (druidQueryType) {
+ case Query.TIMESERIES:
+ reader = new DruidTimeseriesQueryRecordReader();
+ break;
+ case Query.TOPN:
+ reader = new DruidTopNQueryRecordReader();
+ break;
+ case Query.GROUP_BY:
+ reader = new DruidGroupByQueryRecordReader();
+ break;
+ case Query.SELECT:
+ reader = new DruidSelectQueryRecordReader();
+ break;
+ default:
+ throw new IOException("Druid query type not recognized");
+ }
+ reader.initialize((HiveDruidSplit) split, job);
+ return reader;
+ }
+
+ @Override
+ public RecordReader<NullWritable, DruidWritable> createRecordReader(InputSplit split,
+ TaskAttemptContext context
+ ) throws IOException, InterruptedException {
+ // We need to provide a different record reader for every type of Druid query.
+ // The reason is that Druid results format is different for each type.
+ final String druidQueryType = context.getConfiguration().get(Constants.DRUID_QUERY_TYPE);
+ if (druidQueryType == null) {
+ return new DruidSelectQueryRecordReader(); // By default
+ }
+ final DruidQueryRecordReader<?, ?> reader;
+ switch (druidQueryType) {
+ case Query.TIMESERIES:
+ reader = new DruidTimeseriesQueryRecordReader();
+ break;
+ case Query.TOPN:
+ reader = new DruidTopNQueryRecordReader();
+ break;
+ case Query.GROUP_BY:
+ reader = new DruidGroupByQueryRecordReader();
+ break;
+ case Query.SELECT:
+ reader = new DruidSelectQueryRecordReader();
+ break;
+ default:
+ throw new IOException("Druid query type not recognized");
+ }
+ return reader;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
new file mode 100644
index 0000000..1601a9a
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
@@ -0,0 +1,260 @@
+package org.apache.hadoop.hive.druid.io;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.base.Throwables;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Lists;
+import com.metamx.common.Granularity;
+import io.druid.data.input.Committer;
+import io.druid.data.input.InputRow;
+import io.druid.data.input.MapBasedInputRow;
+import io.druid.segment.indexing.DataSchema;
+import io.druid.segment.indexing.RealtimeTuningConfig;
+import io.druid.segment.loading.DataSegmentPusher;
+import io.druid.segment.realtime.FireDepartmentMetrics;
+import io.druid.segment.realtime.appenderator.Appenderator;
+import io.druid.segment.realtime.appenderator.Appenderators;
+import io.druid.segment.realtime.appenderator.SegmentIdentifier;
+import io.druid.segment.realtime.appenderator.SegmentNotWritableException;
+import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
+import io.druid.segment.realtime.plumber.Committers;
+import io.druid.timeline.DataSegment;
+import io.druid.timeline.partition.LinearShardSpec;
+import org.apache.calcite.adapter.druid.DruidTable;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.druid.serde.DruidWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+public class DruidRecordWriter implements RecordWriter<NullWritable, DruidWritable>,
+ org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter {
+ protected static final Logger LOG = LoggerFactory.getLogger(DruidRecordWriter.class);
+
+ private final DataSchema dataSchema;
+
+ private final Appenderator appenderator;
+
+ private final RealtimeTuningConfig tuningConfig;
+
+ private final Path segmentsDescriptorDir;
+
+ private SegmentIdentifier currentOpenSegment = null;
+
+ private final Integer maxPartitionSize;
+
+ private final FileSystem fileSystem;
+
+ private final Supplier<Committer> committerSupplier;
+
+ public DruidRecordWriter(
+ DataSchema dataSchema,
+ RealtimeTuningConfig realtimeTuningConfig,
+ DataSegmentPusher dataSegmentPusher,
+ int maxPartitionSize,
+ final Path segmentsDescriptorsDir,
+ final FileSystem fileSystem
+ ) {
+ this.tuningConfig = Preconditions
+ .checkNotNull(realtimeTuningConfig, "realtimeTuningConfig is null");
+ this.dataSchema = Preconditions.checkNotNull(dataSchema, "data schema is null");
+ appenderator = Appenderators
+ .createOffline(this.dataSchema,
+ tuningConfig,
+ new FireDepartmentMetrics(), dataSegmentPusher,
+ DruidStorageHandlerUtils.JSON_MAPPER,
+ DruidStorageHandlerUtils.INDEX_IO,
+ DruidStorageHandlerUtils.INDEX_MERGER_V9
+ );
+ Preconditions.checkArgument(maxPartitionSize > 0, "maxPartitionSize need to be greater than 0");
+ this.maxPartitionSize = maxPartitionSize;
+ appenderator.startJob(); // maybe we need to move this out of the constructor
+ this.segmentsDescriptorDir = Preconditions
+ .checkNotNull(segmentsDescriptorsDir, "segmentsDescriptorsDir is null");
+ this.fileSystem = Preconditions.checkNotNull(fileSystem, "file system is null");
+ committerSupplier = Suppliers.ofInstance(Committers.nil());
+ }
+
+ /**
+ * This function computes the segment identifier and push the current open segment
+ * The push will occur if max size is reached or the event belongs to the next interval.
+ * Note that this function assumes that timestamps are pseudo sorted.
+ * This function will close and move to the next segment granularity as soon as
+ * an event from the next interval appears. The sorting is done by the previous stage.
+ *
+ * @return segmentIdentifier with of the truncatedTime and maybe push the current open segment.
+ */
+ private SegmentIdentifier getSegmentIdentifierAndMaybePush(long truncatedTime) {
+
+ final Granularity segmentGranularity = dataSchema.getGranularitySpec()
+ .getSegmentGranularity();
+
+ final Interval interval = new Interval(
+ new DateTime(truncatedTime),
+ segmentGranularity.increment(new DateTime(truncatedTime))
+ );
+
+ SegmentIdentifier retVal;
+ if (currentOpenSegment == null) {
+ retVal = new SegmentIdentifier(
+ dataSchema.getDataSource(),
+ interval,
+ tuningConfig.getVersioningPolicy().getVersion(interval),
+ new LinearShardSpec(0)
+ );
+ currentOpenSegment = retVal;
+ return retVal;
+ } else if (currentOpenSegment.getInterval().equals(interval)) {
+ retVal = currentOpenSegment;
+ int rowCount = appenderator.getRowCount(retVal);
+ if (rowCount < maxPartitionSize) {
+ return retVal;
+ } else {
+ retVal = new SegmentIdentifier(
+ dataSchema.getDataSource(),
+ interval,
+ tuningConfig.getVersioningPolicy().getVersion(interval),
+ new LinearShardSpec(currentOpenSegment.getShardSpec().getPartitionNum() + 1)
+ );
+ pushSegments(Lists.newArrayList(currentOpenSegment));
+ currentOpenSegment = retVal;
+ return retVal;
+ }
+ } else {
+ retVal = new SegmentIdentifier(
+ dataSchema.getDataSource(),
+ interval,
+ tuningConfig.getVersioningPolicy().getVersion(interval),
+ new LinearShardSpec(0)
+ );
+ pushSegments(Lists.newArrayList(currentOpenSegment));
+ currentOpenSegment = retVal;
+ return retVal;
+ }
+ }
+
+ private void pushSegments(List<SegmentIdentifier> segmentsToPush) {
+ try {
+ SegmentsAndMetadata segmentsAndMetadata = appenderator
+ .push(segmentsToPush, committerSupplier.get()).get();
+ final HashSet<String> pushedSegmentIdentifierHashSet = new HashSet<>();
+
+ for (DataSegment pushedSegment : segmentsAndMetadata.getSegments()) {
+ pushedSegmentIdentifierHashSet
+ .add(SegmentIdentifier.fromDataSegment(pushedSegment).getIdentifierAsString());
+ final Path segmentDescriptorOutputPath = DruidStorageHandlerUtils
+ .makeSegmentDescriptorOutputPath(pushedSegment, segmentsDescriptorDir);
+ DruidStorageHandlerUtils
+ .writeSegmentDescriptor(fileSystem, pushedSegment, segmentDescriptorOutputPath);
+
+ LOG.info(
+ String.format(
+ "Pushed the segment [%s] and persisted the descriptor located at [%s]",
+ pushedSegment,
+ segmentDescriptorOutputPath
+ )
+ );
+ }
+
+ final HashSet<String> toPushSegmentsHashSet = new HashSet(
+ FluentIterable.from(segmentsToPush)
+ .transform(new Function<SegmentIdentifier, String>() {
+ @Nullable
+ @Override
+ public String apply(
+ @Nullable SegmentIdentifier input
+ ) {
+ return input.getIdentifierAsString();
+ }
+ })
+ .toList());
+
+ if (!pushedSegmentIdentifierHashSet.equals(toPushSegmentsHashSet)) {
+ throw new IllegalStateException(String.format(
+ "was asked to publish [%s] but was able to publish only [%s]",
+ Joiner.on(", ").join(toPushSegmentsHashSet),
+ Joiner.on(", ").join(pushedSegmentIdentifierHashSet)
+ ));
+ }
+
+ LOG.info(String.format("Published [%,d] segments.", segmentsToPush.size()));
+ } catch (InterruptedException e) {
+ LOG.error(String.format("got interrupted, failed to push [%,d] segments.",
+ segmentsToPush.size()
+ ), e);
+ Thread.currentThread().interrupt();
+ } catch (IOException | ExecutionException e) {
+ LOG.error(String.format("Failed to push [%,d] segments.", segmentsToPush.size()), e);
+ Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public void write(Writable w) throws IOException {
+ DruidWritable record = (DruidWritable) w;
+ final long timestamp = (long) record.getValue().get(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
+ final long truncatedTime = (long) record.getValue()
+ .get(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME);
+
+ InputRow inputRow = new MapBasedInputRow(
+ timestamp,
+ dataSchema.getParser()
+ .getParseSpec()
+ .getDimensionsSpec()
+ .getDimensionNames(),
+ record.getValue()
+ );
+
+ try {
+ appenderator
+ .add(getSegmentIdentifierAndMaybePush(truncatedTime), inputRow, committerSupplier);
+ } catch (SegmentNotWritableException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void close(boolean abort) throws IOException {
+ try {
+ if (abort == false) {
+ final List<SegmentIdentifier> segmentsToPush = Lists.newArrayList();
+ segmentsToPush.addAll(appenderator.getSegments());
+ pushSegments(segmentsToPush);
+ }
+ appenderator.clear();
+ } catch (InterruptedException e) {
+ Throwables.propagate(e);
+ } finally {
+ appenderator.close();
+ }
+ }
+
+ @Override
+ public void write(NullWritable key, DruidWritable value) throws IOException {
+ this.write(value);
+ }
+
+ @Override
+ public void close(Reporter reporter) throws IOException {
+ this.close(true);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/io/HiveDruidSplit.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/HiveDruidSplit.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/HiveDruidSplit.java
new file mode 100644
index 0000000..861075d
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/HiveDruidSplit.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+
+/**
+ * Druid split. Its purpose is to trigger query execution in Druid.
+ */
+public class HiveDruidSplit extends FileSplit implements org.apache.hadoop.mapred.InputSplit {
+
+ private String address;
+
+ private String druidQuery;
+
+ // required for deserialization
+ public HiveDruidSplit() {
+ super((Path) null, 0, 0, (String[]) null);
+ }
+
+ public HiveDruidSplit(String address, String druidQuery, Path dummyPath) {
+ super(dummyPath, 0, 0, (String[]) null);
+ this.address = address;
+ this.druidQuery = druidQuery;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ out.writeUTF(address);
+ out.writeUTF(druidQuery);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ address = in.readUTF();
+ druidQuery = in.readUTF();
+ }
+
+ @Override
+ public long getLength() {
+ return 0L;
+ }
+
+ @Override
+ public String[] getLocations() {
+ return new String[] { "" };
+ }
+
+ public String getAddress() {
+ return address;
+ }
+
+ public String getDruidQuery() {
+ return druidQuery;
+ }
+
+ @Override
+ public String toString() {
+ return "HiveDruidSplit{" + address + ", " + druidQuery + "}";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
index f97f820..9e8b439 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
@@ -42,7 +42,9 @@ public class DruidGroupByQueryRecordReader
extends DruidQueryRecordReader<GroupByQuery, Row> {
private Row current;
+
private int[] indexes = new int[0];
+
// Row objects returned by GroupByQuery have different access paths depending on
// whether the result for the metric is a Float or a Long, thus we keep track
// using these converters
@@ -62,11 +64,14 @@ public class DruidGroupByQueryRecordReader
@Override
protected List<Row> createResultsList(InputStream content) throws IOException {
return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content,
- new TypeReference<List<Row>>(){});
+ new TypeReference<List<Row>>() {
+ }
+ );
}
private void initExtractors() throws IOException {
- extractors = new Extract[query.getAggregatorSpecs().size() + query.getPostAggregatorSpecs().size()];
+ extractors = new Extract[query.getAggregatorSpecs().size() + query.getPostAggregatorSpecs()
+ .size()];
int counter = 0;
for (int i = 0; i < query.getAggregatorSpecs().size(); i++, counter++) {
AggregatorFactory af = query.getAggregatorSpecs().get(i);
@@ -103,7 +108,7 @@ public class DruidGroupByQueryRecordReader
if (results.hasNext()) {
current = results.next();
indexes = new int[query.getDimensions().size()];
- for (int i=0; i < query.getDimensions().size(); i++) {
+ for (int i = 0; i < query.getDimensions().size(); i++) {
DimensionSpec ds = query.getDimensions().get(i);
indexes[i] = current.getDimension(ds.getDimension()).size() - 1;
}
@@ -124,7 +129,7 @@ public class DruidGroupByQueryRecordReader
// 1) The timestamp column
value.getValue().put(DruidTable.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis());
// 2) The dimension columns
- for (int i=0; i < query.getDimensions().size(); i++) {
+ for (int i = 0; i < query.getDimensions().size(); i++) {
DimensionSpec ds = query.getDimensions().get(i);
List<String> dims = current.getDimension(ds.getDimension());
if (dims.size() == 0) {
@@ -163,7 +168,7 @@ public class DruidGroupByQueryRecordReader
// 1) The timestamp column
value.getValue().put(DruidTable.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis());
// 2) The dimension columns
- for (int i=0; i < query.getDimensions().size(); i++) {
+ for (int i = 0; i < query.getDimensions().size(); i++) {
DimensionSpec ds = query.getDimensions().get(i);
List<String> dims = current.getDimension(ds.getDimension());
if (dims.size() == 0) {
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
index fe6213b..dc9d6a0 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
@@ -17,15 +17,16 @@
*/
package org.apache.hadoop.hive.druid.serde;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Iterator;
-import java.util.List;
-
+import com.google.common.collect.Iterators;
+import com.metamx.common.lifecycle.Lifecycle;
+import com.metamx.http.client.HttpClient;
+import com.metamx.http.client.HttpClientConfig;
+import com.metamx.http.client.HttpClientInit;
+import io.druid.query.BaseQuery;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
-import org.apache.hadoop.hive.druid.HiveDruidSplit;
+import org.apache.hadoop.hive.druid.io.HiveDruidSplit;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
@@ -34,24 +35,21 @@ import org.joda.time.Period;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Iterators;
-import com.metamx.common.lifecycle.Lifecycle;
-import com.metamx.http.client.HttpClient;
-import com.metamx.http.client.HttpClientConfig;
-import com.metamx.http.client.HttpClientInit;
-
-import io.druid.query.BaseQuery;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
/**
* Base record reader for given a Druid query. This class contains the logic to
* send the query to the broker and retrieve the results. The transformation to
* emit records needs to be done by the classes that extend the reader.
- *
+ *
* The key for each record will be a NullWritable, while the value will be a
* DruidWritable containing the timestamp as well as all values resulting from
* the query.
*/
-public abstract class DruidQueryRecordReader<T extends BaseQuery<R>,R extends Comparable<R>>
+public abstract class DruidQueryRecordReader<T extends BaseQuery<R>, R extends Comparable<R>>
extends RecordReader<NullWritable, DruidWritable>
implements org.apache.hadoop.mapred.RecordReader<NullWritable, DruidWritable> {
@@ -83,6 +81,7 @@ public abstract class DruidQueryRecordReader<T extends BaseQuery<R>,R extends Co
LOG.info("Retrieving from druid using query:\n " + query);
}
+ final Lifecycle lifecycle = new Lifecycle();
final int numConnection = HiveConf
.getIntVar(conf, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION);
final Period readTimeout = new Period(
@@ -90,10 +89,17 @@ public abstract class DruidQueryRecordReader<T extends BaseQuery<R>,R extends Co
HttpClient client = HttpClientInit.createClient(
HttpClientConfig.builder().withReadTimeout(readTimeout.toStandardDuration())
- .withNumConnections(numConnection).build(), new Lifecycle());
- InputStream response = DruidStorageHandlerUtils.submitRequest(client,
- DruidStorageHandlerUtils.createRequest(hiveDruidSplit.getAddress(), query));
+ .withNumConnections(numConnection).build(), lifecycle);
+ try {
+ lifecycle.start();
+ } catch (Exception e) {
+ LOG.error("Issues with lifecycle start", e);
+ }
+ InputStream response = DruidStorageHandlerUtils.submitRequest(client,
+ DruidStorageHandlerUtils.createRequest(hiveDruidSplit.getAddress(), query)
+ );
+ lifecycle.stop();
// Retrieve results
List<R> resultsList;
try {
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java
index c30ac56..8a41e91 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java
@@ -41,6 +41,7 @@ public class DruidSelectQueryRecordReader
extends DruidQueryRecordReader<SelectQuery, Result<SelectResultValue>> {
private Result<SelectResultValue> current;
+
private Iterator<EventHolder> values = Iterators.emptyIterator();
@Override
@@ -49,9 +50,12 @@ public class DruidSelectQueryRecordReader
}
@Override
- protected List<Result<SelectResultValue>> createResultsList(InputStream content) throws IOException {
+ protected List<Result<SelectResultValue>> createResultsList(InputStream content)
+ throws IOException {
return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content,
- new TypeReference<List<Result<SelectResultValue>>>(){});
+ new TypeReference<List<Result<SelectResultValue>>>() {
+ }
+ );
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
index eb78a70..2e90df1 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
@@ -17,19 +17,33 @@
*/
package org.apache.hadoop.hive.druid.serde;
-import java.io.IOException;
-import java.io.InputStream;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Properties;
-
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.metamx.common.lifecycle.Lifecycle;
+import com.metamx.http.client.HttpClient;
+import com.metamx.http.client.HttpClientConfig;
+import com.metamx.http.client.HttpClientInit;
+import io.druid.query.Druids;
+import io.druid.query.Druids.SegmentMetadataQueryBuilder;
+import io.druid.query.Query;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.PostAggregator;
+import io.druid.query.dimension.DimensionSpec;
+import io.druid.query.groupby.GroupByQuery;
+import io.druid.query.metadata.metadata.ColumnAnalysis;
+import io.druid.query.metadata.metadata.SegmentAnalysis;
+import io.druid.query.metadata.metadata.SegmentMetadataQuery;
+import io.druid.query.select.SelectQuery;
+import io.druid.query.timeseries.TimeseriesQuery;
+import io.druid.query.topn.TopNQuery;
import org.apache.calcite.adapter.druid.DruidTable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeSpec;
@@ -37,12 +51,19 @@ import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.StringUtils;
@@ -50,43 +71,34 @@ import org.joda.time.Period;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.common.collect.Lists;
-import com.metamx.common.lifecycle.Lifecycle;
-import com.metamx.http.client.HttpClient;
-import com.metamx.http.client.HttpClientConfig;
-import com.metamx.http.client.HttpClientInit;
-
-import io.druid.query.Druids;
-import io.druid.query.Druids.SegmentMetadataQueryBuilder;
-import io.druid.query.Query;
-import io.druid.query.aggregation.AggregatorFactory;
-import io.druid.query.aggregation.PostAggregator;
-import io.druid.query.dimension.DimensionSpec;
-import io.druid.query.groupby.GroupByQuery;
-import io.druid.query.metadata.metadata.ColumnAnalysis;
-import io.druid.query.metadata.metadata.SegmentAnalysis;
-import io.druid.query.metadata.metadata.SegmentMetadataQuery;
-import io.druid.query.select.SelectQuery;
-import io.druid.query.timeseries.TimeseriesQuery;
-import io.druid.query.topn.TopNQuery;
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
/**
* DruidSerDe that is used to deserialize objects from a Druid data source.
*/
-@SerDeSpec(schemaProps = {Constants.DRUID_DATA_SOURCE})
+@SerDeSpec(schemaProps = { Constants.DRUID_DATA_SOURCE })
public class DruidSerDe extends AbstractSerDe {
protected static final Logger LOG = LoggerFactory.getLogger(DruidSerDe.class);
private String[] columns;
+
private PrimitiveTypeInfo[] types;
- private ObjectInspector inspector;
private int numConnection;
private Period readTimeout;
+ private ObjectInspector inspector;
+
@Override
public void initialize(Configuration configuration, Properties properties) throws SerDeException {
final List<String> columnNames = new ArrayList<>();
@@ -96,56 +108,93 @@ public class DruidSerDe extends AbstractSerDe {
// Druid query
String druidQuery = properties.getProperty(Constants.DRUID_QUERY_JSON);
if (druidQuery == null) {
- // No query. We need to create a Druid Segment Metadata query that retrieves all
- // columns present in the data source (dimensions and metrics).
- // Create Segment Metadata Query
- String dataSource = properties.getProperty(Constants.DRUID_DATA_SOURCE);
- if (dataSource == null) {
- throw new SerDeException("Druid data source not specified; use " +
- Constants.DRUID_DATA_SOURCE + " in table properties");
- }
- SegmentMetadataQueryBuilder builder = new Druids.SegmentMetadataQueryBuilder();
- builder.dataSource(dataSource);
- builder.merge(true);
- builder.analysisTypes();
- SegmentMetadataQuery query = builder.build();
+ // No query. Either it is a CTAS, or we need to create a Druid
+ // Segment Metadata query that retrieves all columns present in
+ // the data source (dimensions and metrics).
+ if (!org.apache.commons.lang3.StringUtils
+ .isEmpty(properties.getProperty(serdeConstants.LIST_COLUMNS))
+ && !org.apache.commons.lang3.StringUtils
+ .isEmpty(properties.getProperty(serdeConstants.LIST_COLUMN_TYPES))) {
+ columnNames.addAll(Utilities.getColumnNames(properties));
+ if (!columnNames.contains(DruidTable.DEFAULT_TIMESTAMP_COLUMN)) {
+ throw new SerDeException("Timestamp column (' " + DruidTable.DEFAULT_TIMESTAMP_COLUMN +
+ "') not specified in create table; list of columns is : " +
+ properties.getProperty(serdeConstants.LIST_COLUMNS));
+ }
+ columnTypes.addAll(Lists.transform(Utilities.getColumnTypes(properties),
+ new Function<String, PrimitiveTypeInfo>() {
+ @Override
+ public PrimitiveTypeInfo apply(String type) {
+ return TypeInfoFactory.getPrimitiveTypeInfo(type);
+ }
+ }
+ ));
+ inspectors.addAll(Lists.transform(columnTypes,
+ new Function<PrimitiveTypeInfo, ObjectInspector>() {
+ @Override
+ public ObjectInspector apply(PrimitiveTypeInfo type) {
+ return PrimitiveObjectInspectorFactory
+ .getPrimitiveWritableObjectInspector(type);
+ }
+ }
+ ));
+ columns = columnNames.toArray(new String[columnNames.size()]);
+ types = columnTypes.toArray(new PrimitiveTypeInfo[columnTypes.size()]);
+ inspector = ObjectInspectorFactory
+ .getStandardStructObjectInspector(columnNames, inspectors);
+ } else {
+ String dataSource = properties.getProperty(Constants.DRUID_DATA_SOURCE);
+ if (dataSource == null) {
+ throw new SerDeException("Druid data source not specified; use " +
+ Constants.DRUID_DATA_SOURCE + " in table properties");
+ }
+ SegmentMetadataQueryBuilder builder = new Druids.SegmentMetadataQueryBuilder();
+ builder.dataSource(dataSource);
+ builder.merge(true);
+ builder.analysisTypes();
+ SegmentMetadataQuery query = builder.build();
- // Execute query in Druid
- String address = HiveConf.getVar(configuration,
- HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS);
- if (org.apache.commons.lang3.StringUtils.isEmpty(address)) {
- throw new SerDeException("Druid broker address not specified in configuration");
- }
+ // Execute query in Druid
+ String address = HiveConf.getVar(configuration,
+ HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS
+ );
+ if (org.apache.commons.lang3.StringUtils.isEmpty(address)) {
+ throw new SerDeException("Druid broker address not specified in configuration");
+ }
numConnection = HiveConf
.getIntVar(configuration, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION);
readTimeout = new Period(
HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT));
- // Infer schema
- SegmentAnalysis schemaInfo;
- try {
- schemaInfo = submitMetadataRequest(address, query);
- } catch (IOException e) {
- throw new SerDeException(e);
- }
- for (Entry<String,ColumnAnalysis> columnInfo : schemaInfo.getColumns().entrySet()) {
- if (columnInfo.getKey().equals(DruidTable.DEFAULT_TIMESTAMP_COLUMN)) {
- // Special handling for timestamp column
+
+ // Infer schema
+ SegmentAnalysis schemaInfo;
+ try {
+ schemaInfo = submitMetadataRequest(address, query);
+ } catch (IOException e) {
+ throw new SerDeException(e);
+ }
+ for (Entry<String, ColumnAnalysis> columnInfo : schemaInfo.getColumns().entrySet()) {
+ if (columnInfo.getKey().equals(DruidTable.DEFAULT_TIMESTAMP_COLUMN)) {
+ // Special handling for timestamp column
+ columnNames.add(columnInfo.getKey()); // field name
+ PrimitiveTypeInfo type = TypeInfoFactory.timestampTypeInfo; // field type
+ columnTypes.add(type);
+ inspectors
+ .add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(type));
+ continue;
+ }
columnNames.add(columnInfo.getKey()); // field name
- PrimitiveTypeInfo type = TypeInfoFactory.timestampTypeInfo; // field type
+ PrimitiveTypeInfo type = DruidSerDeUtils.convertDruidToHiveType(
+ columnInfo.getValue().getType()); // field type
columnTypes.add(type);
inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(type));
- continue;
}
- columnNames.add(columnInfo.getKey()); // field name
- PrimitiveTypeInfo type = DruidSerDeUtils.convertDruidToHiveType(
- columnInfo.getValue().getType()); // field type
- columnTypes.add(type);
- inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(type));
+ columns = columnNames.toArray(new String[columnNames.size()]);
+ types = columnTypes.toArray(new PrimitiveTypeInfo[columnTypes.size()]);
+ inspector = ObjectInspectorFactory
+ .getStandardStructObjectInspector(columnNames, inspectors);
}
- columns = columnNames.toArray(new String[columnNames.size()]);
- types = columnTypes.toArray(new PrimitiveTypeInfo[columnTypes.size()]);
- inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors);
} else {
// Query is specified, we can extract the results schema from the query
Query<?> query;
@@ -171,13 +220,14 @@ public class DruidSerDe extends AbstractSerDe {
default:
throw new SerDeException("Not supported Druid query");
}
-
+
columns = new String[columnNames.size()];
types = new PrimitiveTypeInfo[columnNames.size()];
for (int i = 0; i < columnTypes.size(); ++i) {
columns[i] = columnNames.get(i);
types[i] = columnTypes.get(i);
- inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(types[i]));
+ inspectors
+ .add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(types[i]));
}
inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors);
}
@@ -192,22 +242,29 @@ public class DruidSerDe extends AbstractSerDe {
/* Submits the request and returns */
protected SegmentAnalysis submitMetadataRequest(String address, SegmentMetadataQuery query)
throws SerDeException, IOException {
+ final Lifecycle lifecycle = new Lifecycle();
HttpClient client = HttpClientInit.createClient(
HttpClientConfig.builder().withNumConnections(numConnection)
- .withReadTimeout(readTimeout.toStandardDuration()).build(), new Lifecycle());
+ .withReadTimeout(readTimeout.toStandardDuration()).build(), lifecycle);
InputStream response;
try {
+ lifecycle.start();
response = DruidStorageHandlerUtils.submitRequest(client,
- DruidStorageHandlerUtils.createRequest(address, query));
+ DruidStorageHandlerUtils.createRequest(address, query)
+ );
} catch (Exception e) {
throw new SerDeException(StringUtils.stringifyException(e));
+ } finally {
+ lifecycle.stop();
}
// Retrieve results
List<SegmentAnalysis> resultsList;
try {
resultsList = DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response,
- new TypeReference<List<SegmentAnalysis>>() {});
+ new TypeReference<List<SegmentAnalysis>>() {
+ }
+ );
} catch (Exception e) {
response.close();
throw new SerDeException(StringUtils.stringifyException(e));
@@ -224,7 +281,8 @@ public class DruidSerDe extends AbstractSerDe {
/* Timeseries query */
private void inferSchema(TimeseriesQuery query, List<String> columnNames,
- List<PrimitiveTypeInfo> columnTypes) {
+ List<PrimitiveTypeInfo> columnTypes
+ ) {
// Timestamp column
columnNames.add(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
columnTypes.add(TypeInfoFactory.timestampTypeInfo);
@@ -241,7 +299,9 @@ public class DruidSerDe extends AbstractSerDe {
}
/* TopN query */
- private void inferSchema(TopNQuery query, List<String> columnNames, List<PrimitiveTypeInfo> columnTypes) {
+ private void inferSchema(TopNQuery query, List<String> columnNames,
+ List<PrimitiveTypeInfo> columnTypes
+ ) {
// Timestamp column
columnNames.add(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
columnTypes.add(TypeInfoFactory.timestampTypeInfo);
@@ -262,7 +322,8 @@ public class DruidSerDe extends AbstractSerDe {
/* Select query */
private void inferSchema(SelectQuery query, List<String> columnNames,
- List<PrimitiveTypeInfo> columnTypes) {
+ List<PrimitiveTypeInfo> columnTypes
+ ) {
// Timestamp column
columnNames.add(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
columnTypes.add(TypeInfoFactory.timestampTypeInfo);
@@ -279,7 +340,9 @@ public class DruidSerDe extends AbstractSerDe {
}
/* GroupBy query */
- private void inferSchema(GroupByQuery query, List<String> columnNames, List<PrimitiveTypeInfo> columnTypes) {
+ private void inferSchema(GroupByQuery query, List<String> columnNames,
+ List<PrimitiveTypeInfo> columnTypes
+ ) {
// Timestamp column
columnNames.add(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
columnTypes.add(TypeInfoFactory.timestampTypeInfo);
@@ -302,17 +365,67 @@ public class DruidSerDe extends AbstractSerDe {
@Override
public Class<? extends Writable> getSerializedClass() {
- return NullWritable.class;
+ return DruidWritable.class;
}
@Override
public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException {
- return NullWritable.get();
+ if (objectInspector.getCategory() != ObjectInspector.Category.STRUCT) {
+ throw new SerDeException(getClass().toString()
+ + " can only serialize struct types, but we got: "
+ + objectInspector.getTypeName());
+ }
+
+ // Prepare the field ObjectInspectors
+ StructObjectInspector soi = (StructObjectInspector) objectInspector;
+ List<? extends StructField> fields = soi.getAllStructFieldRefs();
+ List<Object> values = soi.getStructFieldsDataAsList(o);
+ // We deserialize the result
+ Map<String, Object> value = new HashMap<>();
+ for (int i = 0; i < columns.length; i++) {
+ if (values.get(i) == null) {
+ // null, we just add it
+ value.put(columns[i], null);
+ continue;
+ }
+ final Object res;
+ switch (types[i].getPrimitiveCategory()) {
+ case TIMESTAMP:
+ res = ((TimestampObjectInspector) fields.get(i).getFieldObjectInspector())
+ .getPrimitiveJavaObject(
+ values.get(i)).getTime();
+ break;
+ case LONG:
+ res = ((LongObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i));
+ break;
+ case FLOAT:
+ res = ((FloatObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i));
+ break;
+ case DOUBLE:
+ res = ((DoubleObjectInspector) fields.get(i).getFieldObjectInspector())
+ .get(values.get(i));
+ break;
+ case STRING:
+ res = ((StringObjectInspector) fields.get(i).getFieldObjectInspector())
+ .getPrimitiveJavaObject(
+ values.get(i));
+ break;
+ default:
+ throw new SerDeException("Unknown type: " + types[i].getPrimitiveCategory());
+ }
+ value.put(columns[i], res);
+ }
+ value.put(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME,
+ ((TimestampObjectInspector) fields.get(columns.length).getFieldObjectInspector())
+ .getPrimitiveJavaObject(values.get(columns.length)).getTime()
+ );
+ return new DruidWritable(value);
}
@Override
public SerDeStats getSerDeStats() {
- throw new UnsupportedOperationException("SerdeStats not supported.");
+ // no support for statistics
+ return null;
}
@Override
@@ -327,13 +440,16 @@ public class DruidSerDe extends AbstractSerDe {
}
switch (types[i].getPrimitiveCategory()) {
case TIMESTAMP:
- output.add(new TimestampWritable(new Timestamp((Long)value)));
+ output.add(new TimestampWritable(new Timestamp((Long) value)));
break;
case LONG:
- output.add(new LongWritable(((Number)value).longValue()));
+ output.add(new LongWritable(((Number) value).longValue()));
break;
case FLOAT:
- output.add(new FloatWritable(((Number)value).floatValue()));
+ output.add(new FloatWritable(((Number) value).floatValue()));
+ break;
+ case DOUBLE:
+ output.add(new DoubleWritable(((Number) value).floatValue()));
break;
case STRING:
output.add(new Text(value.toString()));
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java
index 29b8845..64a19f6 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java
@@ -31,14 +31,16 @@ public final class DruidSerDeUtils {
private static final Logger LOG = LoggerFactory.getLogger(DruidSerDeUtils.class);
protected static final String FLOAT_TYPE = "FLOAT";
+
protected static final String LONG_TYPE = "LONG";
+
protected static final String STRING_TYPE = "STRING";
/* This method converts from the String representation of Druid type
* to the corresponding Hive type */
public static PrimitiveTypeInfo convertDruidToHiveType(String typeName) {
typeName = typeName.toUpperCase();
- switch(typeName) {
+ switch (typeName) {
case FLOAT_TYPE:
return TypeInfoFactory.floatTypeInfo;
case LONG_TYPE:
@@ -61,7 +63,7 @@ public final class DruidSerDeUtils {
* to the String representation of the corresponding Hive type */
public static String convertDruidToHiveTypeString(String typeName) {
typeName = typeName.toUpperCase();
- switch(typeName) {
+ switch (typeName) {
case FLOAT_TYPE:
return serdeConstants.FLOAT_TYPE_NAME;
case LONG_TYPE:
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java
index b91178c..8c2fb10 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java
@@ -45,9 +45,12 @@ public class DruidTimeseriesQueryRecordReader
}
@Override
- protected List<Result<TimeseriesResultValue>> createResultsList(InputStream content) throws IOException {
+ protected List<Result<TimeseriesResultValue>> createResultsList(InputStream content)
+ throws IOException {
return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content,
- new TypeReference<List<Result<TimeseriesResultValue>>>(){});
+ new TypeReference<List<Result<TimeseriesResultValue>>>() {
+ }
+ );
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java
index 22599c3..d431925 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java
@@ -41,6 +41,7 @@ public class DruidTopNQueryRecordReader
extends DruidQueryRecordReader<TopNQuery, Result<TopNResultValue>> {
private Result<TopNResultValue> current;
+
private Iterator<DimensionAndMetricValueExtractor> values = Iterators.emptyIterator();
@Override
@@ -49,9 +50,12 @@ public class DruidTopNQueryRecordReader
}
@Override
- protected List<Result<TopNResultValue>> createResultsList(InputStream content) throws IOException {
+ protected List<Result<TopNResultValue>> createResultsList(InputStream content)
+ throws IOException {
return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content,
- new TypeReference<List<Result<TopNResultValue>>>(){});
+ new TypeReference<List<Result<TopNResultValue>>>() {
+ }
+ );
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java
new file mode 100644
index 0000000..8770749
--- /dev/null
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java
@@ -0,0 +1,181 @@
+package org.apache.hadoop.hive.druid;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import io.druid.indexer.JobHelper;
+import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler;
+import io.druid.segment.loading.SegmentLoadingException;
+import io.druid.timeline.DataSegment;
+import io.druid.timeline.partition.NoneShardSpec;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+import org.skife.jdbi.v2.Handle;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.UUID;
+
+public class DruidStorageHandlerTest {
+
+ @Rule
+ public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
+
+ @Rule
+ public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private static final String DATA_SOURCE_NAME = "testName";
+
+ private String segmentsTable;
+
+ private String tablePath;
+
+ private DataSegment dataSegment = DataSegment.builder().dataSource(DATA_SOURCE_NAME).version("v1")
+ .interval(new Interval(100, 170)).shardSpec(NoneShardSpec.instance()).build();
+
+ @Before
+ public void before() throws Throwable {
+ tablePath = temporaryFolder.newFolder().getAbsolutePath();
+ segmentsTable = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();
+ Map<String, String> mockMap = ImmutableMap.of(Constants.DRUID_DATA_SOURCE, DATA_SOURCE_NAME);
+ Mockito.when(tableMock.getParameters()).thenReturn(mockMap);
+ Mockito.when(tableMock.getPartitionKeysSize()).thenReturn(0);
+ StorageDescriptor storageDes = Mockito.mock(StorageDescriptor.class);
+ Mockito.when(storageDes.getBucketColsSize()).thenReturn(0);
+ Mockito.when(tableMock.getSd()).thenReturn(storageDes);
+ Mockito.when(tableMock.getDbName()).thenReturn(DATA_SOURCE_NAME);
+ }
+
+ Table tableMock = Mockito.mock(Table.class);
+
+ @Test
+ public void testPreCreateTableWillCreateSegmentsTable() throws MetaException {
+ DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
+ derbyConnectorRule.getConnector(),
+ new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
+ derbyConnectorRule.metadataTablesConfigSupplier().get()
+ );
+
+ try (Handle handle = derbyConnectorRule.getConnector().getDBI().open()) {
+ Assert.assertFalse(derbyConnectorRule.getConnector()
+ .tableExists(handle,
+ segmentsTable
+ ));
+ druidStorageHandler.preCreateTable(tableMock);
+ Assert.assertTrue(derbyConnectorRule.getConnector()
+ .tableExists(handle,
+ segmentsTable
+ ));
+ }
+
+ }
+
+ @Test(expected = MetaException.class)
+ public void testPreCreateTableWhenDataSourceExists() throws MetaException {
+ derbyConnectorRule.getConnector().createSegmentTable();
+ SQLMetadataStorageUpdaterJobHandler sqlMetadataStorageUpdaterJobHandler = new SQLMetadataStorageUpdaterJobHandler(
+ derbyConnectorRule.getConnector());
+ sqlMetadataStorageUpdaterJobHandler.publishSegments(segmentsTable, Arrays.asList(dataSegment),
+ DruidStorageHandlerUtils.JSON_MAPPER
+ );
+ DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
+ derbyConnectorRule.getConnector(),
+ new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
+ derbyConnectorRule.metadataTablesConfigSupplier().get()
+ );
+ druidStorageHandler.preCreateTable(tableMock);
+ }
+
+ @Test
+ public void testCommitCreateTablePlusCommitDropTableWithoutPurge()
+ throws MetaException, IOException {
+ DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
+ derbyConnectorRule.getConnector(),
+ new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
+ derbyConnectorRule.metadataTablesConfigSupplier().get()
+ );
+ druidStorageHandler.preCreateTable(tableMock);
+ Configuration config = new Configuration();
+ config.set(String.valueOf(HiveConf.ConfVars.HIVEQUERYID), UUID.randomUUID().toString());
+ config.set(String.valueOf(HiveConf.ConfVars.DRUID_WORKING_DIR), tablePath);
+ druidStorageHandler.setConf(config);
+ LocalFileSystem localFileSystem = FileSystem.getLocal(config);
+ /*
+ final descriptor path is in the form tablePath/taskId_Attempt_ID/segmentDescriptorDir/segmentIdentifier.json
+ UUID.randomUUID() will fake the taskId_attemptID
+ */
+ Path taskDirPath = new Path(tablePath, druidStorageHandler.makeStagingName());
+ Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment,
+ new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME)
+ );
+ DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, dataSegment, descriptorPath);
+ druidStorageHandler.commitCreateTable(tableMock);
+ Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), Lists.newArrayList(
+ DruidStorageHandlerUtils.getAllDataSourceNames(derbyConnectorRule.getConnector(),
+ derbyConnectorRule.metadataTablesConfigSupplier().get()
+ )).toArray());
+ druidStorageHandler.commitDropTable(tableMock, false);
+ Assert.assertArrayEquals(Lists.newArrayList().toArray(), Lists.newArrayList(
+ DruidStorageHandlerUtils.getAllDataSourceNames(derbyConnectorRule.getConnector(),
+ derbyConnectorRule.metadataTablesConfigSupplier().get()
+ )).toArray());
+
+ }
+
+ @Test
+ public void testDeleteSegment() throws IOException, SegmentLoadingException {
+ DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
+ derbyConnectorRule.getConnector(),
+ new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
+ derbyConnectorRule.metadataTablesConfigSupplier().get()
+ );
+
+ String segmentRootPath = temporaryFolder.newFolder().getAbsolutePath();
+ Configuration config = new Configuration();
+ druidStorageHandler.setConf(config);
+ LocalFileSystem localFileSystem = FileSystem.getLocal(config);
+
+ Path segmentOutputPath = JobHelper
+ .makeSegmentOutputPath(new Path(segmentRootPath), localFileSystem, dataSegment);
+ Path indexPath = new Path(segmentOutputPath, "index.zip");
+ DataSegment dataSegmentWithLoadspect = DataSegment.builder(dataSegment).loadSpec(
+ ImmutableMap.<String, Object>of("path", indexPath)).build();
+ OutputStream outputStream = localFileSystem.create(indexPath, true);
+ outputStream.close();
+ Assert.assertTrue("index file is not created ??", localFileSystem.exists(indexPath));
+ Assert.assertTrue(localFileSystem.exists(segmentOutputPath));
+
+ druidStorageHandler.deleteSegment(dataSegmentWithLoadspect);
+ // path format -- > .../dataSource/interval/version/partitionNum/xxx.zip
+ Assert.assertFalse("Index file still there ??", localFileSystem.exists(indexPath));
+ // path format of segmentOutputPath -- > .../dataSource/interval/version/partitionNum/
+ Assert.assertFalse("PartitionNum directory still there ??",
+ localFileSystem.exists(segmentOutputPath)
+ );
+ Assert.assertFalse("Version directory still there ??",
+ localFileSystem.exists(segmentOutputPath.getParent())
+ );
+ Assert.assertFalse("Interval directory still there ??",
+ localFileSystem.exists(segmentOutputPath.getParent().getParent())
+ );
+ Assert.assertFalse("Data source directory still there ??",
+ localFileSystem.exists(segmentOutputPath.getParent().getParent().getParent())
+ );
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidSerDe.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidSerDe.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidSerDe.java
index 2b4df78..8dc8091 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidSerDe.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidSerDe.java
@@ -41,34 +41,34 @@ public class QTestDruidSerDe extends DruidSerDe {
// + "\"usingDefaultInterval\":true,\"lenientAggregatorMerge\":false,\"descending\":false}";
private static final String RESPONSE =
"[ {\r\n "
- + " \"id\" : \"merged\",\r\n "
- + " \"intervals\" : [ \"2010-01-01T00:00:00.000Z/2015-12-31T00:00:00.000Z\" ],\r\n "
- + " \"columns\" : {\r\n "
- + " \"__time\" : { \"type\" : \"LONG\", \"hasMultipleValues\" : false, \"size\" : 407240380, \"cardinality\" : null, \"errorMessage\" : null },\r\n "
- + " \"robot\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n "
- + " \"namespace\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : true, \"size\" : 100000, \"cardinality\" : 1504, \"errorMessage\" : null },\r\n "
- + " \"anonymous\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n "
- + " \"unpatrolled\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n "
- + " \"page\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n "
- + " \"language\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n "
- + " \"newpage\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n "
- + " \"user\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n "
- + " \"count\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n "
- + " \"added\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n "
- + " \"delta\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n "
- + " \"variation\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n "
- + " \"deleted\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null }\r\n "
- + " },\r\n "
- + " \"aggregators\" : {\r\n "
- + " \"count\" : { \"type\" : \"longSum\", \"name\" : \"count\", \"fieldName\" : \"count\" },\r\n "
- + " \"added\" : { \"type\" : \"doubleSum\", \"name\" : \"added\", \"fieldName\" : \"added\" },\r\n "
- + " \"delta\" : { \"type\" : \"doubleSum\", \"name\" : \"delta\", \"fieldName\" : \"delta\" },\r\n "
- + " \"variation\" : { \"type\" : \"doubleSum\", \"name\" : \"variation\", \"fieldName\" : \"variation\" },\r\n "
- + " \"deleted\" : { \"type\" : \"doubleSum\", \"name\" : \"deleted\", \"fieldName\" : \"deleted\" }\r\n "
- + " },\r\n "
- + " \"queryGranularity\" : {\r\n \"type\": \"none\"\r\n },\r\n "
- + " \"size\" : 300000,\r\n "
- + " \"numRows\" : 5000000\r\n} ]";
+ + " \"id\" : \"merged\",\r\n "
+ + " \"intervals\" : [ \"2010-01-01T00:00:00.000Z/2015-12-31T00:00:00.000Z\" ],\r\n "
+ + " \"columns\" : {\r\n "
+ + " \"__time\" : { \"type\" : \"LONG\", \"hasMultipleValues\" : false, \"size\" : 407240380, \"cardinality\" : null, \"errorMessage\" : null },\r\n "
+ + " \"robot\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n "
+ + " \"namespace\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : true, \"size\" : 100000, \"cardinality\" : 1504, \"errorMessage\" : null },\r\n "
+ + " \"anonymous\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n "
+ + " \"unpatrolled\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n "
+ + " \"page\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n "
+ + " \"language\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n "
+ + " \"newpage\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n "
+ + " \"user\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n "
+ + " \"count\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n "
+ + " \"added\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n "
+ + " \"delta\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n "
+ + " \"variation\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n "
+ + " \"deleted\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null }\r\n "
+ + " },\r\n "
+ + " \"aggregators\" : {\r\n "
+ + " \"count\" : { \"type\" : \"longSum\", \"name\" : \"count\", \"fieldName\" : \"count\" },\r\n "
+ + " \"added\" : { \"type\" : \"doubleSum\", \"name\" : \"added\", \"fieldName\" : \"added\" },\r\n "
+ + " \"delta\" : { \"type\" : \"doubleSum\", \"name\" : \"delta\", \"fieldName\" : \"delta\" },\r\n "
+ + " \"variation\" : { \"type\" : \"doubleSum\", \"name\" : \"variation\", \"fieldName\" : \"variation\" },\r\n "
+ + " \"deleted\" : { \"type\" : \"doubleSum\", \"name\" : \"deleted\", \"fieldName\" : \"deleted\" }\r\n "
+ + " },\r\n "
+ + " \"queryGranularity\" : {\r\n \"type\": \"none\"\r\n },\r\n "
+ + " \"size\" : 300000,\r\n "
+ + " \"numRows\" : 5000000\r\n} ]";
/* Submits the request and returns */
@Override
@@ -78,7 +78,9 @@ public class QTestDruidSerDe extends DruidSerDe {
List<SegmentAnalysis> resultsList;
try {
resultsList = DruidStorageHandlerUtils.JSON_MAPPER.readValue(RESPONSE,
- new TypeReference<List<SegmentAnalysis>>() {});
+ new TypeReference<List<SegmentAnalysis>>() {
+ }
+ );
} catch (Exception e) {
throw new SerDeException(StringUtils.stringifyException(e));
}
http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java
new file mode 100644
index 0000000..75c0129
--- /dev/null
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java
@@ -0,0 +1,108 @@
+package org.apache.hadoop.hive.druid;
+
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import io.druid.metadata.MetadataStorageConnectorConfig;
+import io.druid.metadata.MetadataStorageTablesConfig;
+import io.druid.metadata.storage.derby.DerbyConnector;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
+
+import java.sql.SQLException;
+import java.util.UUID;
+
+public class TestDerbyConnector extends DerbyConnector {
+ private final String jdbcUri;
+
+ public TestDerbyConnector(
+ Supplier<MetadataStorageConnectorConfig> config,
+ Supplier<MetadataStorageTablesConfig> dbTables
+ ) {
+ this(config, dbTables, "jdbc:derby:memory:druidTest" + dbSafeUUID());
+ }
+
+ protected TestDerbyConnector(
+ Supplier<MetadataStorageConnectorConfig> config,
+ Supplier<MetadataStorageTablesConfig> dbTables,
+ String jdbcUri
+ ) {
+ super(config, dbTables, new DBI(jdbcUri + ";create=true"));
+ this.jdbcUri = jdbcUri;
+ }
+
+ public void tearDown() {
+ try {
+ new DBI(jdbcUri + ";drop=true").open().close();
+ } catch (UnableToObtainConnectionException e) {
+ SQLException cause = (SQLException) e.getCause();
+ // error code "08006" indicates proper shutdown
+ Assert.assertEquals(String.format("Derby not shutdown: [%s]", cause.toString()), "08006",
+ cause.getSQLState()
+ );
+ }
+ }
+
+ public static String dbSafeUUID() {
+ return UUID.randomUUID().toString().replace("-", "");
+ }
+
+ public String getJdbcUri() {
+ return jdbcUri;
+ }
+
+ public static class DerbyConnectorRule extends ExternalResource {
+ private TestDerbyConnector connector;
+
+ private final Supplier<MetadataStorageTablesConfig> dbTables;
+
+ private final MetadataStorageConnectorConfig connectorConfig;
+
+ public DerbyConnectorRule() {
+ this("druidTest" + dbSafeUUID());
+ }
+
+ private DerbyConnectorRule(
+ final String defaultBase
+ ) {
+ this(Suppliers.ofInstance(MetadataStorageTablesConfig.fromBase(defaultBase)));
+ }
+
+ public DerbyConnectorRule(
+ Supplier<MetadataStorageTablesConfig> dbTables
+ ) {
+ this.dbTables = dbTables;
+ this.connectorConfig = new MetadataStorageConnectorConfig() {
+ @Override
+ public String getConnectURI() {
+ return connector.getJdbcUri();
+ }
+ };
+ }
+
+ @Override
+ protected void before() throws Throwable {
+ connector = new TestDerbyConnector(Suppliers.ofInstance(connectorConfig), dbTables);
+ connector.getDBI().open().close(); // create db
+ }
+
+ @Override
+ protected void after() {
+ connector.tearDown();
+ }
+
+ public TestDerbyConnector getConnector() {
+ return connector;
+ }
+
+ public MetadataStorageConnectorConfig getMetadataConnectorConfig() {
+ return connectorConfig;
+ }
+
+ public Supplier<MetadataStorageTablesConfig> metadataTablesConfigSupplier() {
+ return dbTables;
+ }
+ }
+
+}