You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2016/12/15 20:51:37 UTC

[3/4] hive git commit: HIVE-15277: Teach Hive how to create/delete Druid segments (Slim Bouguerra, reviewed by Jesus Camacho Rodriguez)

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
new file mode 100644
index 0000000..7ac52c6
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
@@ -0,0 +1,397 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid.io;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.metamx.common.lifecycle.Lifecycle;
+import com.metamx.http.client.HttpClient;
+import com.metamx.http.client.HttpClientConfig;
+import com.metamx.http.client.HttpClientInit;
+import io.druid.query.Druids;
+import io.druid.query.Druids.SegmentMetadataQueryBuilder;
+import io.druid.query.Druids.SelectQueryBuilder;
+import io.druid.query.Druids.TimeBoundaryQueryBuilder;
+import io.druid.query.Query;
+import io.druid.query.Result;
+import io.druid.query.metadata.metadata.SegmentAnalysis;
+import io.druid.query.metadata.metadata.SegmentMetadataQuery;
+import io.druid.query.select.PagingSpec;
+import io.druid.query.select.SelectQuery;
+import io.druid.query.spec.MultipleIntervalSegmentSpec;
+import io.druid.query.timeboundary.TimeBoundaryQuery;
+import io.druid.query.timeboundary.TimeBoundaryResultValue;
+import org.apache.calcite.adapter.druid.DruidDateTimeUtils;
+import org.apache.calcite.adapter.druid.DruidTable;
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.druid.serde.DruidGroupByQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidSelectQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidTimeseriesQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidTopNQueryRecordReader;
+import org.apache.hadoop.hive.druid.serde.DruidWritable;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.joda.time.chrono.ISOChronology;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Druid query based input format.
+ *
+ * Given a query and the Druid broker address, it will send it, and retrieve
+ * and parse the results.
+ */
+public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidWritable>
+        implements org.apache.hadoop.mapred.InputFormat<NullWritable, DruidWritable> {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(DruidQueryBasedInputFormat.class);
+
+  @Override
+  public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplits)
+          throws IOException {
+    return getInputSplits(job);
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
+    return Arrays.<InputSplit>asList(getInputSplits(context.getConfiguration()));
+  }
+
+  @SuppressWarnings("deprecation")
+  private HiveDruidSplit[] getInputSplits(Configuration conf) throws IOException {
+    String address = HiveConf.getVar(conf,
+            HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS
+    );
+    if (StringUtils.isEmpty(address)) {
+      throw new IOException("Druid broker address not specified in configuration");
+    }
+    String druidQuery = StringEscapeUtils.unescapeJava(conf.get(Constants.DRUID_QUERY_JSON));
+    String druidQueryType;
+    if (StringUtils.isEmpty(druidQuery)) {
+      // Empty, maybe because CBO did not run; we fall back to
+      // full Select query
+      if (LOG.isWarnEnabled()) {
+        LOG.warn("Druid query is empty; creating Select query");
+      }
+      String dataSource = conf.get(Constants.DRUID_DATA_SOURCE);
+      if (dataSource == null) {
+        throw new IOException("Druid data source cannot be empty");
+      }
+      druidQuery = createSelectStarQuery(dataSource);
+      druidQueryType = Query.SELECT;
+    } else {
+      druidQueryType = conf.get(Constants.DRUID_QUERY_TYPE);
+      if (druidQueryType == null) {
+        throw new IOException("Druid query type not recognized");
+      }
+    }
+
+    // hive depends on FileSplits
+    Job job = new Job(conf);
+    JobContext jobContext = ShimLoader.getHadoopShims().newJobContext(job);
+    Path[] paths = FileInputFormat.getInputPaths(jobContext);
+
+    switch (druidQueryType) {
+      case Query.TIMESERIES:
+      case Query.TOPN:
+      case Query.GROUP_BY:
+        return new HiveDruidSplit[] { new HiveDruidSplit(address, druidQuery, paths[0]) };
+      case Query.SELECT:
+        return splitSelectQuery(conf, address, druidQuery, paths[0]);
+      default:
+        throw new IOException("Druid query type not recognized");
+    }
+  }
+
+  private static String createSelectStarQuery(String dataSource) throws IOException {
+    // Create Select query
+    SelectQueryBuilder builder = new Druids.SelectQueryBuilder();
+    builder.dataSource(dataSource);
+    builder.intervals(Arrays.asList(DruidTable.DEFAULT_INTERVAL));
+    builder.pagingSpec(PagingSpec.newSpec(1));
+    Map<String, Object> context = new HashMap<>();
+    context.put(Constants.DRUID_QUERY_FETCH, false);
+    builder.context(context);
+    return DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(builder.build());
+  }
+
+  /* Method that splits Select query depending on the threshold so read can be
+   * parallelized */
+  private static HiveDruidSplit[] splitSelectQuery(Configuration conf, String address,
+          String druidQuery, Path dummyPath
+  ) throws IOException {
+    final int selectThreshold = (int) HiveConf.getIntVar(
+            conf, HiveConf.ConfVars.HIVE_DRUID_SELECT_THRESHOLD);
+    final int numConnection = HiveConf
+            .getIntVar(conf, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION);
+    final Period readTimeout = new Period(
+            HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT));
+    SelectQuery query;
+    try {
+      query = DruidStorageHandlerUtils.JSON_MAPPER.readValue(druidQuery, SelectQuery.class);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+
+    final boolean isFetch = query.getContextBoolean(Constants.DRUID_QUERY_FETCH, false);
+    if (isFetch) {
+      // If it has a limit, we use it and we do not split the query
+      return new HiveDruidSplit[] { new HiveDruidSplit(
+              address, DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath) };
+    }
+
+    // We do not have the number of rows, thus we need to execute a
+    // Segment Metadata query to obtain number of rows
+    SegmentMetadataQueryBuilder metadataBuilder = new Druids.SegmentMetadataQueryBuilder();
+    metadataBuilder.dataSource(query.getDataSource());
+    metadataBuilder.intervals(query.getIntervals());
+    metadataBuilder.merge(true);
+    metadataBuilder.analysisTypes();
+    SegmentMetadataQuery metadataQuery = metadataBuilder.build();
+    final Lifecycle lifecycle = new Lifecycle();
+    HttpClient client = HttpClientInit.createClient(
+            HttpClientConfig.builder().withNumConnections(numConnection)
+                    .withReadTimeout(readTimeout.toStandardDuration()).build(), lifecycle);
+    try {
+      lifecycle.start();
+    } catch (Exception e) {
+      LOG.error("Lifecycle start issue", e);
+    }
+    InputStream response;
+    try {
+      response = DruidStorageHandlerUtils.submitRequest(client,
+              DruidStorageHandlerUtils.createRequest(address, metadataQuery)
+      );
+    } catch (Exception e) {
+      throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+    } finally {
+      lifecycle.stop();
+    }
+
+    // Retrieve results
+    List<SegmentAnalysis> metadataList;
+    try {
+      metadataList = DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response,
+              new TypeReference<List<SegmentAnalysis>>() {
+              }
+      );
+    } catch (Exception e) {
+      response.close();
+      throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+    }
+    if (metadataList == null || metadataList.isEmpty()) {
+      throw new IOException("Connected to Druid but could not retrieve datasource information");
+    }
+    if (metadataList.size() != 1) {
+      throw new IOException("Information about segments should have been merged");
+    }
+
+    final long numRows = metadataList.get(0).getNumRows();
+
+    query = query.withPagingSpec(PagingSpec.newSpec(Integer.MAX_VALUE));
+    if (numRows <= selectThreshold) {
+      // We are not going to split it
+      return new HiveDruidSplit[] { new HiveDruidSplit(address,
+              DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath
+      ) };
+    }
+
+    // If the query does not specify a timestamp, we obtain the total time using
+    // a Time Boundary query. Then, we use the information to split the query
+    // following the Select threshold configuration property
+    final List<Interval> intervals = new ArrayList<>();
+    if (query.getIntervals().size() == 1 && query.getIntervals().get(0).withChronology(
+            ISOChronology.getInstanceUTC()).equals(DruidTable.DEFAULT_INTERVAL)) {
+      // Default max and min, we should execute a time boundary query to get a
+      // more precise range
+      TimeBoundaryQueryBuilder timeBuilder = new Druids.TimeBoundaryQueryBuilder();
+      timeBuilder.dataSource(query.getDataSource());
+      TimeBoundaryQuery timeQuery = timeBuilder.build();
+
+      try {
+        response = DruidStorageHandlerUtils.submitRequest(client,
+                DruidStorageHandlerUtils.createRequest(address, timeQuery)
+        );
+      } catch (Exception e) {
+        throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+      }
+
+      // Retrieve results
+      List<Result<TimeBoundaryResultValue>> timeList;
+      try {
+        timeList = DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response,
+                new TypeReference<List<Result<TimeBoundaryResultValue>>>() {
+                }
+        );
+      } catch (Exception e) {
+        response.close();
+        throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+      }
+      if (timeList == null || timeList.isEmpty()) {
+        throw new IOException(
+                "Connected to Druid but could not retrieve time boundary information");
+      }
+      if (timeList.size() != 1) {
+        throw new IOException("We should obtain a single time boundary");
+      }
+
+      intervals.add(new Interval(timeList.get(0).getValue().getMinTime().getMillis(),
+              timeList.get(0).getValue().getMaxTime().getMillis(), ISOChronology.getInstanceUTC()
+      ));
+    } else {
+      intervals.addAll(query.getIntervals());
+    }
+
+    // Create (numRows/default threshold) input splits
+    int numSplits = (int) Math.ceil((double) numRows / selectThreshold);
+    List<List<Interval>> newIntervals = createSplitsIntervals(intervals, numSplits);
+    HiveDruidSplit[] splits = new HiveDruidSplit[numSplits];
+    for (int i = 0; i < numSplits; i++) {
+      // Create partial Select query
+      final SelectQuery partialQuery = query.withQuerySegmentSpec(
+              new MultipleIntervalSegmentSpec(newIntervals.get(i)));
+      splits[i] = new HiveDruidSplit(address,
+              DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(partialQuery), dummyPath
+      );
+    }
+    return splits;
+  }
+
+  private static List<List<Interval>> createSplitsIntervals(List<Interval> intervals, int numSplits
+  ) {
+    final long totalTime = DruidDateTimeUtils.extractTotalTime(intervals);
+    long startTime = intervals.get(0).getStartMillis();
+    long endTime = startTime;
+    long currTime = 0;
+    List<List<Interval>> newIntervals = new ArrayList<>();
+    for (int i = 0, posIntervals = 0; i < numSplits; i++) {
+      final long rangeSize = Math.round((double) (totalTime * (i + 1)) / numSplits) -
+              Math.round((double) (totalTime * i) / numSplits);
+      // Create the new interval(s)
+      List<Interval> currentIntervals = new ArrayList<>();
+      while (posIntervals < intervals.size()) {
+        final Interval interval = intervals.get(posIntervals);
+        final long expectedRange = rangeSize - currTime;
+        if (interval.getEndMillis() - startTime >= expectedRange) {
+          endTime = startTime + expectedRange;
+          currentIntervals.add(new Interval(startTime, endTime, ISOChronology.getInstanceUTC()));
+          startTime = endTime;
+          currTime = 0;
+          break;
+        }
+        endTime = interval.getEndMillis();
+        currentIntervals.add(new Interval(startTime, endTime, ISOChronology.getInstanceUTC()));
+        currTime += (endTime - startTime);
+        startTime = intervals.get(++posIntervals).getStartMillis();
+      }
+      newIntervals.add(currentIntervals);
+    }
+    assert endTime == intervals.get(intervals.size() - 1).getEndMillis();
+    return newIntervals;
+  }
+
+  @Override
+  public org.apache.hadoop.mapred.RecordReader<NullWritable, DruidWritable> getRecordReader(
+          org.apache.hadoop.mapred.InputSplit split, JobConf job, Reporter reporter
+  )
+          throws IOException {
+    // We need to provide a different record reader for every type of Druid query.
+    // The reason is that Druid results format is different for each type.
+    final DruidQueryRecordReader<?, ?> reader;
+    final String druidQueryType = job.get(Constants.DRUID_QUERY_TYPE);
+    if (druidQueryType == null) {
+      reader = new DruidSelectQueryRecordReader(); // By default
+      reader.initialize((HiveDruidSplit) split, job);
+      return reader;
+    }
+    switch (druidQueryType) {
+      case Query.TIMESERIES:
+        reader = new DruidTimeseriesQueryRecordReader();
+        break;
+      case Query.TOPN:
+        reader = new DruidTopNQueryRecordReader();
+        break;
+      case Query.GROUP_BY:
+        reader = new DruidGroupByQueryRecordReader();
+        break;
+      case Query.SELECT:
+        reader = new DruidSelectQueryRecordReader();
+        break;
+      default:
+        throw new IOException("Druid query type not recognized");
+    }
+    reader.initialize((HiveDruidSplit) split, job);
+    return reader;
+  }
+
+  @Override
+  public RecordReader<NullWritable, DruidWritable> createRecordReader(InputSplit split,
+          TaskAttemptContext context
+  ) throws IOException, InterruptedException {
+    // We need to provide a different record reader for every type of Druid query.
+    // The reason is that Druid results format is different for each type.
+    final String druidQueryType = context.getConfiguration().get(Constants.DRUID_QUERY_TYPE);
+    if (druidQueryType == null) {
+      return new DruidSelectQueryRecordReader(); // By default
+    }
+    final DruidQueryRecordReader<?, ?> reader;
+    switch (druidQueryType) {
+      case Query.TIMESERIES:
+        reader = new DruidTimeseriesQueryRecordReader();
+        break;
+      case Query.TOPN:
+        reader = new DruidTopNQueryRecordReader();
+        break;
+      case Query.GROUP_BY:
+        reader = new DruidGroupByQueryRecordReader();
+        break;
+      case Query.SELECT:
+        reader = new DruidSelectQueryRecordReader();
+        break;
+      default:
+        throw new IOException("Druid query type not recognized");
+    }
+    return reader;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
new file mode 100644
index 0000000..1601a9a
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
@@ -0,0 +1,260 @@
+package org.apache.hadoop.hive.druid.io;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.base.Throwables;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Lists;
+import com.metamx.common.Granularity;
+import io.druid.data.input.Committer;
+import io.druid.data.input.InputRow;
+import io.druid.data.input.MapBasedInputRow;
+import io.druid.segment.indexing.DataSchema;
+import io.druid.segment.indexing.RealtimeTuningConfig;
+import io.druid.segment.loading.DataSegmentPusher;
+import io.druid.segment.realtime.FireDepartmentMetrics;
+import io.druid.segment.realtime.appenderator.Appenderator;
+import io.druid.segment.realtime.appenderator.Appenderators;
+import io.druid.segment.realtime.appenderator.SegmentIdentifier;
+import io.druid.segment.realtime.appenderator.SegmentNotWritableException;
+import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
+import io.druid.segment.realtime.plumber.Committers;
+import io.druid.timeline.DataSegment;
+import io.druid.timeline.partition.LinearShardSpec;
+import org.apache.calcite.adapter.druid.DruidTable;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.druid.serde.DruidWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+public class DruidRecordWriter implements RecordWriter<NullWritable, DruidWritable>,
+        org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter {
+  protected static final Logger LOG = LoggerFactory.getLogger(DruidRecordWriter.class);
+
+  private final DataSchema dataSchema;
+
+  private final Appenderator appenderator;
+
+  private final RealtimeTuningConfig tuningConfig;
+
+  private final Path segmentsDescriptorDir;
+
+  private SegmentIdentifier currentOpenSegment = null;
+
+  private final Integer maxPartitionSize;
+
+  private final FileSystem fileSystem;
+
+  private final Supplier<Committer> committerSupplier;
+
+  public DruidRecordWriter(
+          DataSchema dataSchema,
+          RealtimeTuningConfig realtimeTuningConfig,
+          DataSegmentPusher dataSegmentPusher,
+          int maxPartitionSize,
+          final Path segmentsDescriptorsDir,
+          final FileSystem fileSystem
+  ) {
+    this.tuningConfig = Preconditions
+            .checkNotNull(realtimeTuningConfig, "realtimeTuningConfig is null");
+    this.dataSchema = Preconditions.checkNotNull(dataSchema, "data schema is null");
+    appenderator = Appenderators
+            .createOffline(this.dataSchema,
+                    tuningConfig,
+                    new FireDepartmentMetrics(), dataSegmentPusher,
+                    DruidStorageHandlerUtils.JSON_MAPPER,
+                    DruidStorageHandlerUtils.INDEX_IO,
+                    DruidStorageHandlerUtils.INDEX_MERGER_V9
+            );
+    Preconditions.checkArgument(maxPartitionSize > 0, "maxPartitionSize need to be greater than 0");
+    this.maxPartitionSize = maxPartitionSize;
+    appenderator.startJob(); // maybe we need to move this out of the constructor
+    this.segmentsDescriptorDir = Preconditions
+            .checkNotNull(segmentsDescriptorsDir, "segmentsDescriptorsDir is null");
+    this.fileSystem = Preconditions.checkNotNull(fileSystem, "file system is null");
+    committerSupplier = Suppliers.ofInstance(Committers.nil());
+  }
+
+  /**
+   * This function computes the segment identifier and push the current open segment
+   * The push will occur if max size is reached or the event belongs to the next interval.
+   * Note that this function assumes that timestamps are pseudo sorted.
+   * This function will close and move to the next segment granularity as soon as
+   * an event from the next interval appears. The sorting is done by the previous stage.
+   *
+   * @return segmentIdentifier with of the truncatedTime and maybe push the current open segment.
+   */
+  private SegmentIdentifier getSegmentIdentifierAndMaybePush(long truncatedTime) {
+
+    final Granularity segmentGranularity = dataSchema.getGranularitySpec()
+            .getSegmentGranularity();
+
+    final Interval interval = new Interval(
+            new DateTime(truncatedTime),
+            segmentGranularity.increment(new DateTime(truncatedTime))
+    );
+
+    SegmentIdentifier retVal;
+    if (currentOpenSegment == null) {
+      retVal = new SegmentIdentifier(
+              dataSchema.getDataSource(),
+              interval,
+              tuningConfig.getVersioningPolicy().getVersion(interval),
+              new LinearShardSpec(0)
+      );
+      currentOpenSegment = retVal;
+      return retVal;
+    } else if (currentOpenSegment.getInterval().equals(interval)) {
+      retVal = currentOpenSegment;
+      int rowCount = appenderator.getRowCount(retVal);
+      if (rowCount < maxPartitionSize) {
+        return retVal;
+      } else {
+        retVal = new SegmentIdentifier(
+                dataSchema.getDataSource(),
+                interval,
+                tuningConfig.getVersioningPolicy().getVersion(interval),
+                new LinearShardSpec(currentOpenSegment.getShardSpec().getPartitionNum() + 1)
+        );
+        pushSegments(Lists.newArrayList(currentOpenSegment));
+        currentOpenSegment = retVal;
+        return retVal;
+      }
+    } else {
+      retVal = new SegmentIdentifier(
+              dataSchema.getDataSource(),
+              interval,
+              tuningConfig.getVersioningPolicy().getVersion(interval),
+              new LinearShardSpec(0)
+      );
+      pushSegments(Lists.newArrayList(currentOpenSegment));
+      currentOpenSegment = retVal;
+      return retVal;
+    }
+  }
+
+  private void pushSegments(List<SegmentIdentifier> segmentsToPush) {
+    try {
+      SegmentsAndMetadata segmentsAndMetadata = appenderator
+              .push(segmentsToPush, committerSupplier.get()).get();
+      final HashSet<String> pushedSegmentIdentifierHashSet = new HashSet<>();
+
+      for (DataSegment pushedSegment : segmentsAndMetadata.getSegments()) {
+        pushedSegmentIdentifierHashSet
+                .add(SegmentIdentifier.fromDataSegment(pushedSegment).getIdentifierAsString());
+        final Path segmentDescriptorOutputPath = DruidStorageHandlerUtils
+                .makeSegmentDescriptorOutputPath(pushedSegment, segmentsDescriptorDir);
+        DruidStorageHandlerUtils
+                .writeSegmentDescriptor(fileSystem, pushedSegment, segmentDescriptorOutputPath);
+
+        LOG.info(
+                String.format(
+                        "Pushed the segment [%s] and persisted the descriptor located at [%s]",
+                        pushedSegment,
+                        segmentDescriptorOutputPath
+                )
+        );
+      }
+
+      final HashSet<String> toPushSegmentsHashSet = new HashSet(
+              FluentIterable.from(segmentsToPush)
+                      .transform(new Function<SegmentIdentifier, String>() {
+                        @Nullable
+                        @Override
+                        public String apply(
+                                @Nullable SegmentIdentifier input
+                        ) {
+                          return input.getIdentifierAsString();
+                        }
+                      })
+                      .toList());
+
+      if (!pushedSegmentIdentifierHashSet.equals(toPushSegmentsHashSet)) {
+        throw new IllegalStateException(String.format(
+                "was asked to publish [%s] but was able to publish only [%s]",
+                Joiner.on(", ").join(toPushSegmentsHashSet),
+                Joiner.on(", ").join(pushedSegmentIdentifierHashSet)
+        ));
+      }
+
+      LOG.info(String.format("Published [%,d] segments.", segmentsToPush.size()));
+    } catch (InterruptedException e) {
+      LOG.error(String.format("got interrupted, failed to push  [%,d] segments.",
+              segmentsToPush.size()
+      ), e);
+      Thread.currentThread().interrupt();
+    } catch (IOException | ExecutionException e) {
+      LOG.error(String.format("Failed to push  [%,d] segments.", segmentsToPush.size()), e);
+      Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public void write(Writable w) throws IOException {
+    DruidWritable record = (DruidWritable) w;
+    final long timestamp = (long) record.getValue().get(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
+    final long truncatedTime = (long) record.getValue()
+            .get(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME);
+
+    InputRow inputRow = new MapBasedInputRow(
+            timestamp,
+            dataSchema.getParser()
+                    .getParseSpec()
+                    .getDimensionsSpec()
+                    .getDimensionNames(),
+            record.getValue()
+    );
+
+    try {
+      appenderator
+              .add(getSegmentIdentifierAndMaybePush(truncatedTime), inputRow, committerSupplier);
+    } catch (SegmentNotWritableException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void close(boolean abort) throws IOException {
+    try {
+      if (abort == false) {
+        final List<SegmentIdentifier> segmentsToPush = Lists.newArrayList();
+        segmentsToPush.addAll(appenderator.getSegments());
+        pushSegments(segmentsToPush);
+      }
+      appenderator.clear();
+    } catch (InterruptedException e) {
+      Throwables.propagate(e);
+    } finally {
+      appenderator.close();
+    }
+  }
+
+  @Override
+  public void write(NullWritable key, DruidWritable value) throws IOException {
+    this.write(value);
+  }
+
+  @Override
+  public void close(Reporter reporter) throws IOException {
+    this.close(true);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/io/HiveDruidSplit.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/HiveDruidSplit.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/HiveDruidSplit.java
new file mode 100644
index 0000000..861075d
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/HiveDruidSplit.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+
+/**
+ * Druid split. Its purpose is to trigger query execution in Druid.
+ */
+public class HiveDruidSplit extends FileSplit implements org.apache.hadoop.mapred.InputSplit {
+
+  private String address;
+
+  private String druidQuery;
+
+  // required for deserialization
+  public HiveDruidSplit() {
+    super((Path) null, 0, 0, (String[]) null);
+  }
+
+  public HiveDruidSplit(String address, String druidQuery, Path dummyPath) {
+    super(dummyPath, 0, 0, (String[]) null);
+    this.address = address;
+    this.druidQuery = druidQuery;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    out.writeUTF(address);
+    out.writeUTF(druidQuery);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    address = in.readUTF();
+    druidQuery = in.readUTF();
+  }
+
+  @Override
+  public long getLength() {
+    return 0L;
+  }
+
+  @Override
+  public String[] getLocations() {
+    return new String[] { "" };
+  }
+
+  public String getAddress() {
+    return address;
+  }
+
+  public String getDruidQuery() {
+    return druidQuery;
+  }
+
+  @Override
+  public String toString() {
+    return "HiveDruidSplit{" + address + ", " + druidQuery + "}";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
index f97f820..9e8b439 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidGroupByQueryRecordReader.java
@@ -42,7 +42,9 @@ public class DruidGroupByQueryRecordReader
         extends DruidQueryRecordReader<GroupByQuery, Row> {
 
   private Row current;
+
   private int[] indexes = new int[0];
+
   // Row objects returned by GroupByQuery have different access paths depending on
   // whether the result for the metric is a Float or a Long, thus we keep track
   // using these converters
@@ -62,11 +64,14 @@ public class DruidGroupByQueryRecordReader
   @Override
   protected List<Row> createResultsList(InputStream content) throws IOException {
     return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content,
-            new TypeReference<List<Row>>(){});
+            new TypeReference<List<Row>>() {
+            }
+    );
   }
 
   private void initExtractors() throws IOException {
-    extractors = new Extract[query.getAggregatorSpecs().size() + query.getPostAggregatorSpecs().size()];
+    extractors = new Extract[query.getAggregatorSpecs().size() + query.getPostAggregatorSpecs()
+            .size()];
     int counter = 0;
     for (int i = 0; i < query.getAggregatorSpecs().size(); i++, counter++) {
       AggregatorFactory af = query.getAggregatorSpecs().get(i);
@@ -103,7 +108,7 @@ public class DruidGroupByQueryRecordReader
     if (results.hasNext()) {
       current = results.next();
       indexes = new int[query.getDimensions().size()];
-      for (int i=0; i < query.getDimensions().size(); i++) {
+      for (int i = 0; i < query.getDimensions().size(); i++) {
         DimensionSpec ds = query.getDimensions().get(i);
         indexes[i] = current.getDimension(ds.getDimension()).size() - 1;
       }
@@ -124,7 +129,7 @@ public class DruidGroupByQueryRecordReader
     // 1) The timestamp column
     value.getValue().put(DruidTable.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis());
     // 2) The dimension columns
-    for (int i=0; i < query.getDimensions().size(); i++) {
+    for (int i = 0; i < query.getDimensions().size(); i++) {
       DimensionSpec ds = query.getDimensions().get(i);
       List<String> dims = current.getDimension(ds.getDimension());
       if (dims.size() == 0) {
@@ -163,7 +168,7 @@ public class DruidGroupByQueryRecordReader
       // 1) The timestamp column
       value.getValue().put(DruidTable.DEFAULT_TIMESTAMP_COLUMN, current.getTimestamp().getMillis());
       // 2) The dimension columns
-      for (int i=0; i < query.getDimensions().size(); i++) {
+      for (int i = 0; i < query.getDimensions().size(); i++) {
         DimensionSpec ds = query.getDimensions().get(i);
         List<String> dims = current.getDimension(ds.getDimension());
         if (dims.size() == 0) {

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
index fe6213b..dc9d6a0 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
@@ -17,15 +17,16 @@
  */
 package org.apache.hadoop.hive.druid.serde;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Iterator;
-import java.util.List;
-
+import com.google.common.collect.Iterators;
+import com.metamx.common.lifecycle.Lifecycle;
+import com.metamx.http.client.HttpClient;
+import com.metamx.http.client.HttpClientConfig;
+import com.metamx.http.client.HttpClientInit;
+import io.druid.query.BaseQuery;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
-import org.apache.hadoop.hive.druid.HiveDruidSplit;
+import org.apache.hadoop.hive.druid.io.HiveDruidSplit;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
@@ -34,24 +35,21 @@ import org.joda.time.Period;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Iterators;
-import com.metamx.common.lifecycle.Lifecycle;
-import com.metamx.http.client.HttpClient;
-import com.metamx.http.client.HttpClientConfig;
-import com.metamx.http.client.HttpClientInit;
-
-import io.druid.query.BaseQuery;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
 
 /**
  * Base record reader for given a Druid query. This class contains the logic to
  * send the query to the broker and retrieve the results. The transformation to
  * emit records needs to be done by the classes that extend the reader.
- * 
+ *
  * The key for each record will be a NullWritable, while the value will be a
  * DruidWritable containing the timestamp as well as all values resulting from
  * the query.
  */
-public abstract class DruidQueryRecordReader<T extends BaseQuery<R>,R extends Comparable<R>>
+public abstract class DruidQueryRecordReader<T extends BaseQuery<R>, R extends Comparable<R>>
         extends RecordReader<NullWritable, DruidWritable>
         implements org.apache.hadoop.mapred.RecordReader<NullWritable, DruidWritable> {
 
@@ -83,6 +81,7 @@ public abstract class DruidQueryRecordReader<T extends BaseQuery<R>,R extends Co
       LOG.info("Retrieving from druid using query:\n " + query);
     }
 
+    final Lifecycle lifecycle = new Lifecycle();
     final int numConnection = HiveConf
             .getIntVar(conf, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION);
     final Period readTimeout = new Period(
@@ -90,10 +89,17 @@ public abstract class DruidQueryRecordReader<T extends BaseQuery<R>,R extends Co
 
     HttpClient client = HttpClientInit.createClient(
             HttpClientConfig.builder().withReadTimeout(readTimeout.toStandardDuration())
-                    .withNumConnections(numConnection).build(), new Lifecycle());
-    InputStream response = DruidStorageHandlerUtils.submitRequest(client,
-            DruidStorageHandlerUtils.createRequest(hiveDruidSplit.getAddress(), query));
+                    .withNumConnections(numConnection).build(), lifecycle);
 
+    try {
+      lifecycle.start();
+    } catch (Exception e) {
+      LOG.error("Issues with lifecycle start", e);
+    }
+    InputStream response = DruidStorageHandlerUtils.submitRequest(client,
+            DruidStorageHandlerUtils.createRequest(hiveDruidSplit.getAddress(), query)
+    );
+    lifecycle.stop();
     // Retrieve results
     List<R> resultsList;
     try {

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java
index c30ac56..8a41e91 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java
@@ -41,6 +41,7 @@ public class DruidSelectQueryRecordReader
         extends DruidQueryRecordReader<SelectQuery, Result<SelectResultValue>> {
 
   private Result<SelectResultValue> current;
+
   private Iterator<EventHolder> values = Iterators.emptyIterator();
 
   @Override
@@ -49,9 +50,12 @@ public class DruidSelectQueryRecordReader
   }
 
   @Override
-  protected List<Result<SelectResultValue>> createResultsList(InputStream content) throws IOException {
+  protected List<Result<SelectResultValue>> createResultsList(InputStream content)
+          throws IOException {
     return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content,
-            new TypeReference<List<Result<SelectResultValue>>>(){});
+            new TypeReference<List<Result<SelectResultValue>>>() {
+            }
+    );
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
index eb78a70..2e90df1 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java
@@ -17,19 +17,33 @@
  */
 package org.apache.hadoop.hive.druid.serde;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Properties;
-
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.metamx.common.lifecycle.Lifecycle;
+import com.metamx.http.client.HttpClient;
+import com.metamx.http.client.HttpClientConfig;
+import com.metamx.http.client.HttpClientInit;
+import io.druid.query.Druids;
+import io.druid.query.Druids.SegmentMetadataQueryBuilder;
+import io.druid.query.Query;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.PostAggregator;
+import io.druid.query.dimension.DimensionSpec;
+import io.druid.query.groupby.GroupByQuery;
+import io.druid.query.metadata.metadata.ColumnAnalysis;
+import io.druid.query.metadata.metadata.SegmentAnalysis;
+import io.druid.query.metadata.metadata.SegmentMetadataQuery;
+import io.druid.query.select.SelectQuery;
+import io.druid.query.timeseries.TimeseriesQuery;
+import io.druid.query.topn.TopNQuery;
 import org.apache.calcite.adapter.druid.DruidTable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.AbstractSerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeSpec;
@@ -37,12 +51,19 @@ import org.apache.hadoop.hive.serde2.SerDeStats;
 import org.apache.hadoop.hive.serde2.io.TimestampWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.StringUtils;
@@ -50,43 +71,34 @@ import org.joda.time.Period;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.common.collect.Lists;
-import com.metamx.common.lifecycle.Lifecycle;
-import com.metamx.http.client.HttpClient;
-import com.metamx.http.client.HttpClientConfig;
-import com.metamx.http.client.HttpClientInit;
-
-import io.druid.query.Druids;
-import io.druid.query.Druids.SegmentMetadataQueryBuilder;
-import io.druid.query.Query;
-import io.druid.query.aggregation.AggregatorFactory;
-import io.druid.query.aggregation.PostAggregator;
-import io.druid.query.dimension.DimensionSpec;
-import io.druid.query.groupby.GroupByQuery;
-import io.druid.query.metadata.metadata.ColumnAnalysis;
-import io.druid.query.metadata.metadata.SegmentAnalysis;
-import io.druid.query.metadata.metadata.SegmentMetadataQuery;
-import io.druid.query.select.SelectQuery;
-import io.druid.query.timeseries.TimeseriesQuery;
-import io.druid.query.topn.TopNQuery;
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
 
 /**
  * DruidSerDe that is used to  deserialize objects from a Druid data source.
  */
-@SerDeSpec(schemaProps = {Constants.DRUID_DATA_SOURCE})
+@SerDeSpec(schemaProps = { Constants.DRUID_DATA_SOURCE })
 public class DruidSerDe extends AbstractSerDe {
 
   protected static final Logger LOG = LoggerFactory.getLogger(DruidSerDe.class);
 
   private String[] columns;
+
   private PrimitiveTypeInfo[] types;
-  private ObjectInspector inspector;
 
   private int numConnection;
 
   private Period readTimeout;
 
+  private ObjectInspector inspector;
+
   @Override
   public void initialize(Configuration configuration, Properties properties) throws SerDeException {
     final List<String> columnNames = new ArrayList<>();
@@ -96,56 +108,93 @@ public class DruidSerDe extends AbstractSerDe {
     // Druid query
     String druidQuery = properties.getProperty(Constants.DRUID_QUERY_JSON);
     if (druidQuery == null) {
-      // No query. We need to create a Druid Segment Metadata query that retrieves all
-      // columns present in the data source (dimensions and metrics).
-      // Create Segment Metadata Query
-      String dataSource = properties.getProperty(Constants.DRUID_DATA_SOURCE);
-      if (dataSource == null) {
-        throw new SerDeException("Druid data source not specified; use " +
-                Constants.DRUID_DATA_SOURCE + " in table properties");
-      }
-      SegmentMetadataQueryBuilder builder = new Druids.SegmentMetadataQueryBuilder();
-      builder.dataSource(dataSource);
-      builder.merge(true);
-      builder.analysisTypes();
-      SegmentMetadataQuery query = builder.build();
+      // No query. Either it is a CTAS, or we need to create a Druid
+      // Segment Metadata query that retrieves all columns present in
+      // the data source (dimensions and metrics).
+      if (!org.apache.commons.lang3.StringUtils
+              .isEmpty(properties.getProperty(serdeConstants.LIST_COLUMNS))
+              && !org.apache.commons.lang3.StringUtils
+              .isEmpty(properties.getProperty(serdeConstants.LIST_COLUMN_TYPES))) {
+        columnNames.addAll(Utilities.getColumnNames(properties));
+        if (!columnNames.contains(DruidTable.DEFAULT_TIMESTAMP_COLUMN)) {
+          throw new SerDeException("Timestamp column (' " + DruidTable.DEFAULT_TIMESTAMP_COLUMN +
+                  "') not specified in create table; list of columns is : " +
+                  properties.getProperty(serdeConstants.LIST_COLUMNS));
+        }
+        columnTypes.addAll(Lists.transform(Utilities.getColumnTypes(properties),
+                new Function<String, PrimitiveTypeInfo>() {
+                  @Override
+                  public PrimitiveTypeInfo apply(String type) {
+                    return TypeInfoFactory.getPrimitiveTypeInfo(type);
+                  }
+                }
+        ));
+        inspectors.addAll(Lists.transform(columnTypes,
+                new Function<PrimitiveTypeInfo, ObjectInspector>() {
+                  @Override
+                  public ObjectInspector apply(PrimitiveTypeInfo type) {
+                    return PrimitiveObjectInspectorFactory
+                            .getPrimitiveWritableObjectInspector(type);
+                  }
+                }
+        ));
+        columns = columnNames.toArray(new String[columnNames.size()]);
+        types = columnTypes.toArray(new PrimitiveTypeInfo[columnTypes.size()]);
+        inspector = ObjectInspectorFactory
+                .getStandardStructObjectInspector(columnNames, inspectors);
+      } else {
+        String dataSource = properties.getProperty(Constants.DRUID_DATA_SOURCE);
+        if (dataSource == null) {
+          throw new SerDeException("Druid data source not specified; use " +
+                  Constants.DRUID_DATA_SOURCE + " in table properties");
+        }
+        SegmentMetadataQueryBuilder builder = new Druids.SegmentMetadataQueryBuilder();
+        builder.dataSource(dataSource);
+        builder.merge(true);
+        builder.analysisTypes();
+        SegmentMetadataQuery query = builder.build();
 
-      // Execute query in Druid
-      String address = HiveConf.getVar(configuration,
-              HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS);
-      if (org.apache.commons.lang3.StringUtils.isEmpty(address)) {
-        throw new SerDeException("Druid broker address not specified in configuration");
-      }
+        // Execute query in Druid
+        String address = HiveConf.getVar(configuration,
+                HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS
+        );
+        if (org.apache.commons.lang3.StringUtils.isEmpty(address)) {
+          throw new SerDeException("Druid broker address not specified in configuration");
+        }
 
       numConnection = HiveConf
               .getIntVar(configuration, HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION);
       readTimeout = new Period(
               HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT));
-      // Infer schema
-      SegmentAnalysis schemaInfo;
-      try {
-        schemaInfo = submitMetadataRequest(address, query);
-      } catch (IOException e) {
-        throw new SerDeException(e);
-      }
-      for (Entry<String,ColumnAnalysis> columnInfo : schemaInfo.getColumns().entrySet()) {
-        if (columnInfo.getKey().equals(DruidTable.DEFAULT_TIMESTAMP_COLUMN)) {
-          // Special handling for timestamp column
+
+        // Infer schema
+        SegmentAnalysis schemaInfo;
+        try {
+          schemaInfo = submitMetadataRequest(address, query);
+        } catch (IOException e) {
+          throw new SerDeException(e);
+        }
+        for (Entry<String, ColumnAnalysis> columnInfo : schemaInfo.getColumns().entrySet()) {
+          if (columnInfo.getKey().equals(DruidTable.DEFAULT_TIMESTAMP_COLUMN)) {
+            // Special handling for timestamp column
+            columnNames.add(columnInfo.getKey()); // field name
+            PrimitiveTypeInfo type = TypeInfoFactory.timestampTypeInfo; // field type
+            columnTypes.add(type);
+            inspectors
+                    .add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(type));
+            continue;
+          }
           columnNames.add(columnInfo.getKey()); // field name
-          PrimitiveTypeInfo type = TypeInfoFactory.timestampTypeInfo; // field type
+          PrimitiveTypeInfo type = DruidSerDeUtils.convertDruidToHiveType(
+                  columnInfo.getValue().getType()); // field type
           columnTypes.add(type);
           inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(type));
-          continue;
         }
-        columnNames.add(columnInfo.getKey()); // field name
-        PrimitiveTypeInfo type = DruidSerDeUtils.convertDruidToHiveType(
-                columnInfo.getValue().getType()); // field type
-        columnTypes.add(type);
-        inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(type));
+        columns = columnNames.toArray(new String[columnNames.size()]);
+        types = columnTypes.toArray(new PrimitiveTypeInfo[columnTypes.size()]);
+        inspector = ObjectInspectorFactory
+                .getStandardStructObjectInspector(columnNames, inspectors);
       }
-      columns = columnNames.toArray(new String[columnNames.size()]);
-      types = columnTypes.toArray(new PrimitiveTypeInfo[columnTypes.size()]);
-      inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors);
     } else {
       // Query is specified, we can extract the results schema from the query
       Query<?> query;
@@ -171,13 +220,14 @@ public class DruidSerDe extends AbstractSerDe {
         default:
           throw new SerDeException("Not supported Druid query");
       }
-    
+
       columns = new String[columnNames.size()];
       types = new PrimitiveTypeInfo[columnNames.size()];
       for (int i = 0; i < columnTypes.size(); ++i) {
         columns[i] = columnNames.get(i);
         types[i] = columnTypes.get(i);
-        inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(types[i]));
+        inspectors
+                .add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(types[i]));
       }
       inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors);
     }
@@ -192,22 +242,29 @@ public class DruidSerDe extends AbstractSerDe {
   /* Submits the request and returns */
   protected SegmentAnalysis submitMetadataRequest(String address, SegmentMetadataQuery query)
           throws SerDeException, IOException {
+    final Lifecycle lifecycle = new Lifecycle();
     HttpClient client = HttpClientInit.createClient(
             HttpClientConfig.builder().withNumConnections(numConnection)
-                    .withReadTimeout(readTimeout.toStandardDuration()).build(), new Lifecycle());
+                    .withReadTimeout(readTimeout.toStandardDuration()).build(), lifecycle);
     InputStream response;
     try {
+      lifecycle.start();
       response = DruidStorageHandlerUtils.submitRequest(client,
-              DruidStorageHandlerUtils.createRequest(address, query));
+              DruidStorageHandlerUtils.createRequest(address, query)
+      );
     } catch (Exception e) {
       throw new SerDeException(StringUtils.stringifyException(e));
+    } finally {
+      lifecycle.stop();
     }
 
     // Retrieve results
     List<SegmentAnalysis> resultsList;
     try {
       resultsList = DruidStorageHandlerUtils.SMILE_MAPPER.readValue(response,
-              new TypeReference<List<SegmentAnalysis>>() {});
+              new TypeReference<List<SegmentAnalysis>>() {
+              }
+      );
     } catch (Exception e) {
       response.close();
       throw new SerDeException(StringUtils.stringifyException(e));
@@ -224,7 +281,8 @@ public class DruidSerDe extends AbstractSerDe {
 
   /* Timeseries query */
   private void inferSchema(TimeseriesQuery query, List<String> columnNames,
-          List<PrimitiveTypeInfo> columnTypes) {
+          List<PrimitiveTypeInfo> columnTypes
+  ) {
     // Timestamp column
     columnNames.add(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
     columnTypes.add(TypeInfoFactory.timestampTypeInfo);
@@ -241,7 +299,9 @@ public class DruidSerDe extends AbstractSerDe {
   }
 
   /* TopN query */
-  private void inferSchema(TopNQuery query, List<String> columnNames, List<PrimitiveTypeInfo> columnTypes) {
+  private void inferSchema(TopNQuery query, List<String> columnNames,
+          List<PrimitiveTypeInfo> columnTypes
+  ) {
     // Timestamp column
     columnNames.add(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
     columnTypes.add(TypeInfoFactory.timestampTypeInfo);
@@ -262,7 +322,8 @@ public class DruidSerDe extends AbstractSerDe {
 
   /* Select query */
   private void inferSchema(SelectQuery query, List<String> columnNames,
-          List<PrimitiveTypeInfo> columnTypes) {
+          List<PrimitiveTypeInfo> columnTypes
+  ) {
     // Timestamp column
     columnNames.add(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
     columnTypes.add(TypeInfoFactory.timestampTypeInfo);
@@ -279,7 +340,9 @@ public class DruidSerDe extends AbstractSerDe {
   }
 
   /* GroupBy query */
-  private void inferSchema(GroupByQuery query, List<String> columnNames, List<PrimitiveTypeInfo> columnTypes) {
+  private void inferSchema(GroupByQuery query, List<String> columnNames,
+          List<PrimitiveTypeInfo> columnTypes
+  ) {
     // Timestamp column
     columnNames.add(DruidTable.DEFAULT_TIMESTAMP_COLUMN);
     columnTypes.add(TypeInfoFactory.timestampTypeInfo);
@@ -302,17 +365,67 @@ public class DruidSerDe extends AbstractSerDe {
 
   @Override
   public Class<? extends Writable> getSerializedClass() {
-    return NullWritable.class;
+    return DruidWritable.class;
   }
 
   @Override
   public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException {
-    return NullWritable.get();
+    if (objectInspector.getCategory() != ObjectInspector.Category.STRUCT) {
+      throw new SerDeException(getClass().toString()
+              + " can only serialize struct types, but we got: "
+              + objectInspector.getTypeName());
+    }
+
+    // Prepare the field ObjectInspectors
+    StructObjectInspector soi = (StructObjectInspector) objectInspector;
+    List<? extends StructField> fields = soi.getAllStructFieldRefs();
+    List<Object> values = soi.getStructFieldsDataAsList(o);
+    // We deserialize the result
+    Map<String, Object> value = new HashMap<>();
+    for (int i = 0; i < columns.length; i++) {
+      if (values.get(i) == null) {
+        // null, we just add it
+        value.put(columns[i], null);
+        continue;
+      }
+      final Object res;
+      switch (types[i].getPrimitiveCategory()) {
+        case TIMESTAMP:
+          res = ((TimestampObjectInspector) fields.get(i).getFieldObjectInspector())
+                  .getPrimitiveJavaObject(
+                          values.get(i)).getTime();
+          break;
+        case LONG:
+          res = ((LongObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i));
+          break;
+        case FLOAT:
+          res = ((FloatObjectInspector) fields.get(i).getFieldObjectInspector()).get(values.get(i));
+          break;
+        case DOUBLE:
+          res = ((DoubleObjectInspector) fields.get(i).getFieldObjectInspector())
+                  .get(values.get(i));
+          break;
+        case STRING:
+          res = ((StringObjectInspector) fields.get(i).getFieldObjectInspector())
+                  .getPrimitiveJavaObject(
+                          values.get(i));
+          break;
+        default:
+          throw new SerDeException("Unknown type: " + types[i].getPrimitiveCategory());
+      }
+      value.put(columns[i], res);
+    }
+    value.put(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME,
+            ((TimestampObjectInspector) fields.get(columns.length).getFieldObjectInspector())
+                    .getPrimitiveJavaObject(values.get(columns.length)).getTime()
+    );
+    return new DruidWritable(value);
   }
 
   @Override
   public SerDeStats getSerDeStats() {
-    throw new UnsupportedOperationException("SerdeStats not supported.");
+    // no support for statistics
+    return null;
   }
 
   @Override
@@ -327,13 +440,16 @@ public class DruidSerDe extends AbstractSerDe {
       }
       switch (types[i].getPrimitiveCategory()) {
         case TIMESTAMP:
-          output.add(new TimestampWritable(new Timestamp((Long)value)));
+          output.add(new TimestampWritable(new Timestamp((Long) value)));
           break;
         case LONG:
-          output.add(new LongWritable(((Number)value).longValue()));
+          output.add(new LongWritable(((Number) value).longValue()));
           break;
         case FLOAT:
-          output.add(new FloatWritable(((Number)value).floatValue()));
+          output.add(new FloatWritable(((Number) value).floatValue()));
+          break;
+        case DOUBLE:
+          output.add(new DoubleWritable(((Number) value).floatValue()));
           break;
         case STRING:
           output.add(new Text(value.toString()));

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java
index 29b8845..64a19f6 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDeUtils.java
@@ -31,14 +31,16 @@ public final class DruidSerDeUtils {
   private static final Logger LOG = LoggerFactory.getLogger(DruidSerDeUtils.class);
 
   protected static final String FLOAT_TYPE = "FLOAT";
+
   protected static final String LONG_TYPE = "LONG";
+
   protected static final String STRING_TYPE = "STRING";
 
   /* This method converts from the String representation of Druid type
    * to the corresponding Hive type */
   public static PrimitiveTypeInfo convertDruidToHiveType(String typeName) {
     typeName = typeName.toUpperCase();
-    switch(typeName) {
+    switch (typeName) {
       case FLOAT_TYPE:
         return TypeInfoFactory.floatTypeInfo;
       case LONG_TYPE:
@@ -61,7 +63,7 @@ public final class DruidSerDeUtils {
    * to the String representation of the corresponding Hive type */
   public static String convertDruidToHiveTypeString(String typeName) {
     typeName = typeName.toUpperCase();
-    switch(typeName) {
+    switch (typeName) {
       case FLOAT_TYPE:
         return serdeConstants.FLOAT_TYPE_NAME;
       case LONG_TYPE:

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java
index b91178c..8c2fb10 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTimeseriesQueryRecordReader.java
@@ -45,9 +45,12 @@ public class DruidTimeseriesQueryRecordReader
   }
 
   @Override
-  protected List<Result<TimeseriesResultValue>> createResultsList(InputStream content) throws IOException {
+  protected List<Result<TimeseriesResultValue>> createResultsList(InputStream content)
+          throws IOException {
     return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content,
-            new TypeReference<List<Result<TimeseriesResultValue>>>(){});
+            new TypeReference<List<Result<TimeseriesResultValue>>>() {
+            }
+    );
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java
index 22599c3..d431925 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java
@@ -41,6 +41,7 @@ public class DruidTopNQueryRecordReader
         extends DruidQueryRecordReader<TopNQuery, Result<TopNResultValue>> {
 
   private Result<TopNResultValue> current;
+
   private Iterator<DimensionAndMetricValueExtractor> values = Iterators.emptyIterator();
 
   @Override
@@ -49,9 +50,12 @@ public class DruidTopNQueryRecordReader
   }
 
   @Override
-  protected List<Result<TopNResultValue>> createResultsList(InputStream content) throws IOException {
+  protected List<Result<TopNResultValue>> createResultsList(InputStream content)
+          throws IOException {
     return DruidStorageHandlerUtils.SMILE_MAPPER.readValue(content,
-            new TypeReference<List<Result<TopNResultValue>>>(){});
+            new TypeReference<List<Result<TopNResultValue>>>() {
+            }
+    );
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java
new file mode 100644
index 0000000..8770749
--- /dev/null
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java
@@ -0,0 +1,181 @@
+package org.apache.hadoop.hive.druid;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import io.druid.indexer.JobHelper;
+import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler;
+import io.druid.segment.loading.SegmentLoadingException;
+import io.druid.timeline.DataSegment;
+import io.druid.timeline.partition.NoneShardSpec;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+import org.skife.jdbi.v2.Handle;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.UUID;
+
+public class DruidStorageHandlerTest {
+
+  @Rule
+  public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
+
+  @Rule
+  public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  private static final String DATA_SOURCE_NAME = "testName";
+
+  private String segmentsTable;
+
+  private String tablePath;
+
+  private DataSegment dataSegment = DataSegment.builder().dataSource(DATA_SOURCE_NAME).version("v1")
+          .interval(new Interval(100, 170)).shardSpec(NoneShardSpec.instance()).build();
+
+  @Before
+  public void before() throws Throwable {
+    tablePath = temporaryFolder.newFolder().getAbsolutePath();
+    segmentsTable = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();
+    Map<String, String> mockMap = ImmutableMap.of(Constants.DRUID_DATA_SOURCE, DATA_SOURCE_NAME);
+    Mockito.when(tableMock.getParameters()).thenReturn(mockMap);
+    Mockito.when(tableMock.getPartitionKeysSize()).thenReturn(0);
+    StorageDescriptor storageDes = Mockito.mock(StorageDescriptor.class);
+    Mockito.when(storageDes.getBucketColsSize()).thenReturn(0);
+    Mockito.when(tableMock.getSd()).thenReturn(storageDes);
+    Mockito.when(tableMock.getDbName()).thenReturn(DATA_SOURCE_NAME);
+  }
+
+  Table tableMock = Mockito.mock(Table.class);
+
+  @Test
+  public void testPreCreateTableWillCreateSegmentsTable() throws MetaException {
+    DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
+            derbyConnectorRule.getConnector(),
+            new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
+            derbyConnectorRule.metadataTablesConfigSupplier().get()
+    );
+
+    try (Handle handle = derbyConnectorRule.getConnector().getDBI().open()) {
+      Assert.assertFalse(derbyConnectorRule.getConnector()
+              .tableExists(handle,
+                      segmentsTable
+              ));
+      druidStorageHandler.preCreateTable(tableMock);
+      Assert.assertTrue(derbyConnectorRule.getConnector()
+              .tableExists(handle,
+                      segmentsTable
+              ));
+    }
+
+  }
+
+  @Test(expected = MetaException.class)
+  public void testPreCreateTableWhenDataSourceExists() throws MetaException {
+    derbyConnectorRule.getConnector().createSegmentTable();
+    SQLMetadataStorageUpdaterJobHandler sqlMetadataStorageUpdaterJobHandler = new SQLMetadataStorageUpdaterJobHandler(
+            derbyConnectorRule.getConnector());
+    sqlMetadataStorageUpdaterJobHandler.publishSegments(segmentsTable, Arrays.asList(dataSegment),
+            DruidStorageHandlerUtils.JSON_MAPPER
+    );
+    DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
+            derbyConnectorRule.getConnector(),
+            new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
+            derbyConnectorRule.metadataTablesConfigSupplier().get()
+    );
+    druidStorageHandler.preCreateTable(tableMock);
+  }
+
+  @Test
+  public void testCommitCreateTablePlusCommitDropTableWithoutPurge()
+          throws MetaException, IOException {
+    DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
+            derbyConnectorRule.getConnector(),
+            new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
+            derbyConnectorRule.metadataTablesConfigSupplier().get()
+    );
+    druidStorageHandler.preCreateTable(tableMock);
+    Configuration config = new Configuration();
+    config.set(String.valueOf(HiveConf.ConfVars.HIVEQUERYID), UUID.randomUUID().toString());
+    config.set(String.valueOf(HiveConf.ConfVars.DRUID_WORKING_DIR), tablePath);
+    druidStorageHandler.setConf(config);
+    LocalFileSystem localFileSystem = FileSystem.getLocal(config);
+    /*
+    final descriptor path is in the form tablePath/taskId_Attempt_ID/segmentDescriptorDir/segmentIdentifier.json
+    UUID.randomUUID() will fake the taskId_attemptID
+    */
+    Path taskDirPath = new Path(tablePath, druidStorageHandler.makeStagingName());
+    Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment,
+            new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME)
+    );
+    DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, dataSegment, descriptorPath);
+    druidStorageHandler.commitCreateTable(tableMock);
+    Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), Lists.newArrayList(
+            DruidStorageHandlerUtils.getAllDataSourceNames(derbyConnectorRule.getConnector(),
+                    derbyConnectorRule.metadataTablesConfigSupplier().get()
+            )).toArray());
+    druidStorageHandler.commitDropTable(tableMock, false);
+    Assert.assertArrayEquals(Lists.newArrayList().toArray(), Lists.newArrayList(
+            DruidStorageHandlerUtils.getAllDataSourceNames(derbyConnectorRule.getConnector(),
+                    derbyConnectorRule.metadataTablesConfigSupplier().get()
+            )).toArray());
+
+  }
+
+  @Test
+  public void testDeleteSegment() throws IOException, SegmentLoadingException {
+    DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
+            derbyConnectorRule.getConnector(),
+            new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
+            derbyConnectorRule.metadataTablesConfigSupplier().get()
+    );
+
+    String segmentRootPath = temporaryFolder.newFolder().getAbsolutePath();
+    Configuration config = new Configuration();
+    druidStorageHandler.setConf(config);
+    LocalFileSystem localFileSystem = FileSystem.getLocal(config);
+
+    Path segmentOutputPath = JobHelper
+            .makeSegmentOutputPath(new Path(segmentRootPath), localFileSystem, dataSegment);
+    Path indexPath = new Path(segmentOutputPath, "index.zip");
+    DataSegment dataSegmentWithLoadspect = DataSegment.builder(dataSegment).loadSpec(
+            ImmutableMap.<String, Object>of("path", indexPath)).build();
+    OutputStream outputStream = localFileSystem.create(indexPath, true);
+    outputStream.close();
+    Assert.assertTrue("index file is not created ??", localFileSystem.exists(indexPath));
+    Assert.assertTrue(localFileSystem.exists(segmentOutputPath));
+
+    druidStorageHandler.deleteSegment(dataSegmentWithLoadspect);
+    // path format -- > .../dataSource/interval/version/partitionNum/xxx.zip
+    Assert.assertFalse("Index file still there ??", localFileSystem.exists(indexPath));
+    // path format of segmentOutputPath -- > .../dataSource/interval/version/partitionNum/
+    Assert.assertFalse("PartitionNum directory still there ??",
+            localFileSystem.exists(segmentOutputPath)
+    );
+    Assert.assertFalse("Version directory still there ??",
+            localFileSystem.exists(segmentOutputPath.getParent())
+    );
+    Assert.assertFalse("Interval directory still there ??",
+            localFileSystem.exists(segmentOutputPath.getParent().getParent())
+    );
+    Assert.assertFalse("Data source directory still there ??",
+            localFileSystem.exists(segmentOutputPath.getParent().getParent().getParent())
+    );
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidSerDe.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidSerDe.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidSerDe.java
index 2b4df78..8dc8091 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidSerDe.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidSerDe.java
@@ -41,34 +41,34 @@ public class QTestDruidSerDe extends DruidSerDe {
   //        + "\"usingDefaultInterval\":true,\"lenientAggregatorMerge\":false,\"descending\":false}";
   private static final String RESPONSE =
           "[ {\r\n "
-          + " \"id\" : \"merged\",\r\n "
-          + " \"intervals\" : [ \"2010-01-01T00:00:00.000Z/2015-12-31T00:00:00.000Z\" ],\r\n "
-          + " \"columns\" : {\r\n  "
-          + "  \"__time\" : { \"type\" : \"LONG\", \"hasMultipleValues\" : false, \"size\" : 407240380, \"cardinality\" : null, \"errorMessage\" : null },\r\n  "
-          + "  \"robot\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n  "
-          + "  \"namespace\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : true, \"size\" : 100000, \"cardinality\" : 1504, \"errorMessage\" : null },\r\n  "
-          + "  \"anonymous\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n  "
-          + "  \"unpatrolled\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n  "
-          + "  \"page\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n  "
-          + "  \"language\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n  "
-          + "  \"newpage\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n  "
-          + "  \"user\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n  "
-          + "  \"count\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n  "
-          + "  \"added\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n  "
-          + "  \"delta\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n  "
-          + "  \"variation\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n  "
-          + "  \"deleted\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null }\r\n "
-          + " },\r\n "
-          + " \"aggregators\" : {\r\n  "
-          + "  \"count\" : { \"type\" : \"longSum\", \"name\" : \"count\", \"fieldName\" : \"count\" },\r\n  "
-          + "  \"added\" : { \"type\" : \"doubleSum\", \"name\" : \"added\", \"fieldName\" : \"added\" },\r\n  "
-          + "  \"delta\" : { \"type\" : \"doubleSum\", \"name\" : \"delta\", \"fieldName\" : \"delta\" },\r\n  "
-          + "  \"variation\" : { \"type\" : \"doubleSum\", \"name\" : \"variation\", \"fieldName\" : \"variation\" },\r\n  "
-          + "  \"deleted\" : { \"type\" : \"doubleSum\", \"name\" : \"deleted\", \"fieldName\" : \"deleted\" }\r\n "
-          + " },\r\n "
-          + " \"queryGranularity\" : {\r\n    \"type\": \"none\"\r\n  },\r\n "
-          + " \"size\" : 300000,\r\n "
-          + " \"numRows\" : 5000000\r\n} ]";
+                  + " \"id\" : \"merged\",\r\n "
+                  + " \"intervals\" : [ \"2010-01-01T00:00:00.000Z/2015-12-31T00:00:00.000Z\" ],\r\n "
+                  + " \"columns\" : {\r\n  "
+                  + "  \"__time\" : { \"type\" : \"LONG\", \"hasMultipleValues\" : false, \"size\" : 407240380, \"cardinality\" : null, \"errorMessage\" : null },\r\n  "
+                  + "  \"robot\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n  "
+                  + "  \"namespace\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : true, \"size\" : 100000, \"cardinality\" : 1504, \"errorMessage\" : null },\r\n  "
+                  + "  \"anonymous\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n  "
+                  + "  \"unpatrolled\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n  "
+                  + "  \"page\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n  "
+                  + "  \"language\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n  "
+                  + "  \"newpage\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n  "
+                  + "  \"user\" : { \"type\" : \"STRING\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : 1944, \"errorMessage\" : null },\r\n  "
+                  + "  \"count\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n  "
+                  + "  \"added\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n  "
+                  + "  \"delta\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n  "
+                  + "  \"variation\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null },\r\n  "
+                  + "  \"deleted\" : { \"type\" : \"FLOAT\", \"hasMultipleValues\" : false, \"size\" : 100000, \"cardinality\" : null, \"errorMessage\" : null }\r\n "
+                  + " },\r\n "
+                  + " \"aggregators\" : {\r\n  "
+                  + "  \"count\" : { \"type\" : \"longSum\", \"name\" : \"count\", \"fieldName\" : \"count\" },\r\n  "
+                  + "  \"added\" : { \"type\" : \"doubleSum\", \"name\" : \"added\", \"fieldName\" : \"added\" },\r\n  "
+                  + "  \"delta\" : { \"type\" : \"doubleSum\", \"name\" : \"delta\", \"fieldName\" : \"delta\" },\r\n  "
+                  + "  \"variation\" : { \"type\" : \"doubleSum\", \"name\" : \"variation\", \"fieldName\" : \"variation\" },\r\n  "
+                  + "  \"deleted\" : { \"type\" : \"doubleSum\", \"name\" : \"deleted\", \"fieldName\" : \"deleted\" }\r\n "
+                  + " },\r\n "
+                  + " \"queryGranularity\" : {\r\n    \"type\": \"none\"\r\n  },\r\n "
+                  + " \"size\" : 300000,\r\n "
+                  + " \"numRows\" : 5000000\r\n} ]";
 
   /* Submits the request and returns */
   @Override
@@ -78,7 +78,9 @@ public class QTestDruidSerDe extends DruidSerDe {
     List<SegmentAnalysis> resultsList;
     try {
       resultsList = DruidStorageHandlerUtils.JSON_MAPPER.readValue(RESPONSE,
-            new TypeReference<List<SegmentAnalysis>>() {});
+              new TypeReference<List<SegmentAnalysis>>() {
+              }
+      );
     } catch (Exception e) {
       throw new SerDeException(StringUtils.stringifyException(e));
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/590687bc/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java
new file mode 100644
index 0000000..75c0129
--- /dev/null
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java
@@ -0,0 +1,108 @@
+package org.apache.hadoop.hive.druid;
+
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import io.druid.metadata.MetadataStorageConnectorConfig;
+import io.druid.metadata.MetadataStorageTablesConfig;
+import io.druid.metadata.storage.derby.DerbyConnector;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
+
+import java.sql.SQLException;
+import java.util.UUID;
+
+public class TestDerbyConnector extends DerbyConnector {
+  private final String jdbcUri;
+
+  public TestDerbyConnector(
+          Supplier<MetadataStorageConnectorConfig> config,
+          Supplier<MetadataStorageTablesConfig> dbTables
+  ) {
+    this(config, dbTables, "jdbc:derby:memory:druidTest" + dbSafeUUID());
+  }
+
+  protected TestDerbyConnector(
+          Supplier<MetadataStorageConnectorConfig> config,
+          Supplier<MetadataStorageTablesConfig> dbTables,
+          String jdbcUri
+  ) {
+    super(config, dbTables, new DBI(jdbcUri + ";create=true"));
+    this.jdbcUri = jdbcUri;
+  }
+
+  public void tearDown() {
+    try {
+      new DBI(jdbcUri + ";drop=true").open().close();
+    } catch (UnableToObtainConnectionException e) {
+      SQLException cause = (SQLException) e.getCause();
+      // error code "08006" indicates proper shutdown
+      Assert.assertEquals(String.format("Derby not shutdown: [%s]", cause.toString()), "08006",
+              cause.getSQLState()
+      );
+    }
+  }
+
+  public static String dbSafeUUID() {
+    return UUID.randomUUID().toString().replace("-", "");
+  }
+
+  public String getJdbcUri() {
+    return jdbcUri;
+  }
+
+  public static class DerbyConnectorRule extends ExternalResource {
+    private TestDerbyConnector connector;
+
+    private final Supplier<MetadataStorageTablesConfig> dbTables;
+
+    private final MetadataStorageConnectorConfig connectorConfig;
+
+    public DerbyConnectorRule() {
+      this("druidTest" + dbSafeUUID());
+    }
+
+    private DerbyConnectorRule(
+            final String defaultBase
+    ) {
+      this(Suppliers.ofInstance(MetadataStorageTablesConfig.fromBase(defaultBase)));
+    }
+
+    public DerbyConnectorRule(
+            Supplier<MetadataStorageTablesConfig> dbTables
+    ) {
+      this.dbTables = dbTables;
+      this.connectorConfig = new MetadataStorageConnectorConfig() {
+        @Override
+        public String getConnectURI() {
+          return connector.getJdbcUri();
+        }
+      };
+    }
+
+    @Override
+    protected void before() throws Throwable {
+      connector = new TestDerbyConnector(Suppliers.ofInstance(connectorConfig), dbTables);
+      connector.getDBI().open().close(); // create db
+    }
+
+    @Override
+    protected void after() {
+      connector.tearDown();
+    }
+
+    public TestDerbyConnector getConnector() {
+      return connector;
+    }
+
+    public MetadataStorageConnectorConfig getMetadataConnectorConfig() {
+      return connectorConfig;
+    }
+
+    public Supplier<MetadataStorageTablesConfig> metadataTablesConfigSupplier() {
+      return dbTables;
+    }
+  }
+
+}