You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2017/02/25 19:54:58 UTC

hive git commit: HIVE-15702 : Test timeout : TestDerbyConnector (Slim Bouguerra via Ashutosh Chauhan)

Repository: hive
Updated Branches:
  refs/heads/master 10449a7af -> 2aa054a4d


HIVE-15702 : Test timeout : TestDerbyConnector (Slim Bouguerra via Ashutosh Chauhan)

Signed-off-by: Ashutosh Chauhan <ha...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2aa054a4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2aa054a4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2aa054a4

Branch: refs/heads/master
Commit: 2aa054a4d5aaf516839a3574cf600765288d98b0
Parents: 10449a7
Author: Slim Bouguerra <sl...@gmail.com>
Authored: Sat Feb 25 11:54:06 2017 -0800
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Sat Feb 25 11:54:06 2017 -0800

----------------------------------------------------------------------
 .../hive/druid/DerbyConnectorTestUtility.java   | 126 ++++++++++
 .../hive/druid/DruidStorageHandlerTest.java     | 224 -----------------
 .../hadoop/hive/druid/TestDerbyConnector.java   | 126 ----------
 .../hive/druid/TestDruidStorageHandler.java     | 224 +++++++++++++++++
 .../hive/ql/io/DruidRecordWriterTest.java       | 239 -------------------
 .../hive/ql/io/TestDruidRecordWriter.java       | 239 +++++++++++++++++++
 6 files changed, 589 insertions(+), 589 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/2aa054a4/druid-handler/src/test/org/apache/hadoop/hive/druid/DerbyConnectorTestUtility.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/DerbyConnectorTestUtility.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/DerbyConnectorTestUtility.java
new file mode 100644
index 0000000..f9304a5
--- /dev/null
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/DerbyConnectorTestUtility.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.druid;
+
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import io.druid.metadata.MetadataStorageConnectorConfig;
+import io.druid.metadata.MetadataStorageTablesConfig;
+import io.druid.metadata.storage.derby.DerbyConnector;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
+
+import java.sql.SQLException;
+import java.util.UUID;
+
+public class DerbyConnectorTestUtility extends DerbyConnector {
+  private final String jdbcUri;
+
+  public DerbyConnectorTestUtility(
+          Supplier<MetadataStorageConnectorConfig> config,
+          Supplier<MetadataStorageTablesConfig> dbTables
+  ) {
+    this(config, dbTables, "jdbc:derby:memory:druidTest" + dbSafeUUID());
+  }
+
+  protected DerbyConnectorTestUtility(
+          Supplier<MetadataStorageConnectorConfig> config,
+          Supplier<MetadataStorageTablesConfig> dbTables,
+          String jdbcUri
+  ) {
+    super(config, dbTables, new DBI(jdbcUri + ";create=true"));
+    this.jdbcUri = jdbcUri;
+  }
+
+  public void tearDown() {
+    try {
+      new DBI(jdbcUri + ";drop=true").open().close();
+    } catch (UnableToObtainConnectionException e) {
+      SQLException cause = (SQLException) e.getCause();
+      // error code "08006" indicates proper shutdown
+      Assert.assertEquals(String.format("Derby not shutdown: [%s]", cause.toString()), "08006",
+              cause.getSQLState()
+      );
+    }
+  }
+
+  public static String dbSafeUUID() {
+    return UUID.randomUUID().toString().replace("-", "");
+  }
+
+  public String getJdbcUri() {
+    return jdbcUri;
+  }
+
+  public static class DerbyConnectorRule extends ExternalResource {
+    private DerbyConnectorTestUtility connector;
+
+    private final Supplier<MetadataStorageTablesConfig> dbTables;
+
+    private final MetadataStorageConnectorConfig connectorConfig;
+
+    public DerbyConnectorRule() {
+      this("druidTest" + dbSafeUUID());
+    }
+
+    private DerbyConnectorRule(
+            final String defaultBase
+    ) {
+      this(Suppliers.ofInstance(MetadataStorageTablesConfig.fromBase(defaultBase)));
+    }
+
+    public DerbyConnectorRule(
+            Supplier<MetadataStorageTablesConfig> dbTables
+    ) {
+      this.dbTables = dbTables;
+      this.connectorConfig = new MetadataStorageConnectorConfig() {
+        @Override
+        public String getConnectURI() {
+          return connector.getJdbcUri();
+        }
+      };
+    }
+
+    @Override
+    protected void before() throws Throwable {
+      connector = new DerbyConnectorTestUtility(Suppliers.ofInstance(connectorConfig), dbTables);
+      connector.getDBI().open().close(); // create db
+    }
+
+    @Override
+    protected void after() {
+      connector.tearDown();
+    }
+
+    public DerbyConnectorTestUtility getConnector() {
+      return connector;
+    }
+
+    public MetadataStorageConnectorConfig getMetadataConnectorConfig() {
+      return connectorConfig;
+    }
+
+    public Supplier<MetadataStorageTablesConfig> metadataTablesConfigSupplier() {
+      return dbTables;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2aa054a4/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java
deleted file mode 100644
index 3573bf9..0000000
--- a/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.druid;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import io.druid.indexer.JobHelper;
-import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler;
-import io.druid.segment.loading.SegmentLoadingException;
-import io.druid.timeline.DataSegment;
-import io.druid.timeline.partition.NoneShardSpec;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.Constants;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.joda.time.Interval;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.mockito.Mockito;
-import org.skife.jdbi.v2.Handle;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.UUID;
-
-public class DruidStorageHandlerTest {
-
-  @Rule
-  public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
-
-  @Rule
-  public final TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-  private static final String DATA_SOURCE_NAME = "testName";
-
-  private String segmentsTable;
-
-  private String tableWorkingPath;
-
-  private DataSegment dataSegment = DataSegment.builder().dataSource(DATA_SOURCE_NAME).version("v1")
-          .interval(new Interval(100, 170)).shardSpec(NoneShardSpec.instance()).build();
-
-  @Before
-  public void before() throws Throwable {
-    tableWorkingPath = temporaryFolder.newFolder().getAbsolutePath();
-    segmentsTable = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();
-    Map<String, String> mockMap = ImmutableMap.of(Constants.DRUID_DATA_SOURCE, DATA_SOURCE_NAME);
-    Mockito.when(tableMock.getParameters()).thenReturn(mockMap);
-    Mockito.when(tableMock.getPartitionKeysSize()).thenReturn(0);
-    StorageDescriptor storageDes = Mockito.mock(StorageDescriptor.class);
-    Mockito.when(storageDes.getBucketColsSize()).thenReturn(0);
-    Mockito.when(tableMock.getSd()).thenReturn(storageDes);
-    Mockito.when(tableMock.getDbName()).thenReturn(DATA_SOURCE_NAME);
-  }
-
-  Table tableMock = Mockito.mock(Table.class);
-
-  @Test
-  public void testPreCreateTableWillCreateSegmentsTable() throws MetaException {
-    DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
-            derbyConnectorRule.getConnector(),
-            new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
-            derbyConnectorRule.metadataTablesConfigSupplier().get(),
-            null
-    );
-
-    try (Handle handle = derbyConnectorRule.getConnector().getDBI().open()) {
-      Assert.assertFalse(derbyConnectorRule.getConnector()
-              .tableExists(handle,
-                      segmentsTable
-              ));
-      druidStorageHandler.preCreateTable(tableMock);
-      Assert.assertTrue(derbyConnectorRule.getConnector()
-              .tableExists(handle,
-                      segmentsTable
-              ));
-    }
-
-  }
-
-  @Test(expected = MetaException.class)
-  public void testPreCreateTableWhenDataSourceExists() throws MetaException {
-    derbyConnectorRule.getConnector().createSegmentTable();
-    SQLMetadataStorageUpdaterJobHandler sqlMetadataStorageUpdaterJobHandler = new SQLMetadataStorageUpdaterJobHandler(
-            derbyConnectorRule.getConnector());
-    sqlMetadataStorageUpdaterJobHandler.publishSegments(segmentsTable, Arrays.asList(dataSegment),
-            DruidStorageHandlerUtils.JSON_MAPPER
-    );
-    DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
-            derbyConnectorRule.getConnector(),
-            new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
-            derbyConnectorRule.metadataTablesConfigSupplier().get(),
-            null
-    );
-    druidStorageHandler.preCreateTable(tableMock);
-  }
-
-  @Test
-  public void testCommitCreateTablePlusCommitDropTableWithoutPurge()
-          throws MetaException, IOException {
-    DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
-            derbyConnectorRule.getConnector(),
-            new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
-            derbyConnectorRule.metadataTablesConfigSupplier().get(),
-            null
-    );
-    druidStorageHandler.preCreateTable(tableMock);
-    Configuration config = new Configuration();
-    config.set(String.valueOf(HiveConf.ConfVars.HIVEQUERYID), UUID.randomUUID().toString());
-    config.set(String.valueOf(HiveConf.ConfVars.DRUID_WORKING_DIR), tableWorkingPath);
-    druidStorageHandler.setConf(config);
-    LocalFileSystem localFileSystem = FileSystem.getLocal(config);
-    Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName());
-    Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment,
-            new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME)
-    );
-    DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, dataSegment, descriptorPath);
-    druidStorageHandler.commitCreateTable(tableMock);
-    Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), Lists.newArrayList(
-            DruidStorageHandlerUtils.getAllDataSourceNames(derbyConnectorRule.getConnector(),
-                    derbyConnectorRule.metadataTablesConfigSupplier().get()
-            )).toArray());
-    druidStorageHandler.commitDropTable(tableMock, false);
-    Assert.assertArrayEquals(Lists.newArrayList().toArray(), Lists.newArrayList(
-            DruidStorageHandlerUtils.getAllDataSourceNames(derbyConnectorRule.getConnector(),
-                    derbyConnectorRule.metadataTablesConfigSupplier().get()
-            )).toArray());
-
-  }
-
-  @Test
-  public void testCommitInsertTable() throws MetaException, IOException {
-    DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
-            derbyConnectorRule.getConnector(),
-            new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
-            derbyConnectorRule.metadataTablesConfigSupplier().get(),
-            null
-    );
-    druidStorageHandler.preCreateTable(tableMock);
-    Configuration config = new Configuration();
-    config.set(String.valueOf(HiveConf.ConfVars.HIVEQUERYID), UUID.randomUUID().toString());
-    config.set(String.valueOf(HiveConf.ConfVars.DRUID_WORKING_DIR), tableWorkingPath);
-    druidStorageHandler.setConf(config);
-    LocalFileSystem localFileSystem = FileSystem.getLocal(config);
-    Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName());
-    Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment,
-            new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME)
-    );
-    DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, dataSegment, descriptorPath);
-    druidStorageHandler.commitCreateTable(tableMock);
-    Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), Lists.newArrayList(
-            DruidStorageHandlerUtils.getAllDataSourceNames(derbyConnectorRule.getConnector(),
-                    derbyConnectorRule.metadataTablesConfigSupplier().get()
-            )).toArray());
-  }
-
-  @Test
-  public void testDeleteSegment() throws IOException, SegmentLoadingException {
-    DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
-            derbyConnectorRule.getConnector(),
-            new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
-            derbyConnectorRule.metadataTablesConfigSupplier().get(),
-            null
-    );
-
-    String segmentRootPath = temporaryFolder.newFolder().getAbsolutePath();
-    Configuration config = new Configuration();
-    druidStorageHandler.setConf(config);
-    LocalFileSystem localFileSystem = FileSystem.getLocal(config);
-
-    Path segmentOutputPath = JobHelper
-            .makeSegmentOutputPath(new Path(segmentRootPath), localFileSystem, dataSegment);
-    Path indexPath = new Path(segmentOutputPath, "index.zip");
-    DataSegment dataSegmentWithLoadspect = DataSegment.builder(dataSegment).loadSpec(
-            ImmutableMap.<String, Object>of("path", indexPath)).build();
-    OutputStream outputStream = localFileSystem.create(indexPath, true);
-    outputStream.close();
-    Assert.assertTrue("index file is not created ??", localFileSystem.exists(indexPath));
-    Assert.assertTrue(localFileSystem.exists(segmentOutputPath));
-
-    druidStorageHandler.deleteSegment(dataSegmentWithLoadspect);
-    // path format -- > .../dataSource/interval/version/partitionNum/xxx.zip
-    Assert.assertFalse("Index file still there ??", localFileSystem.exists(indexPath));
-    // path format of segmentOutputPath -- > .../dataSource/interval/version/partitionNum/
-    Assert.assertFalse("PartitionNum directory still there ??",
-            localFileSystem.exists(segmentOutputPath)
-    );
-    Assert.assertFalse("Version directory still there ??",
-            localFileSystem.exists(segmentOutputPath.getParent())
-    );
-    Assert.assertFalse("Interval directory still there ??",
-            localFileSystem.exists(segmentOutputPath.getParent().getParent())
-    );
-    Assert.assertFalse("Data source directory still there ??",
-            localFileSystem.exists(segmentOutputPath.getParent().getParent().getParent())
-    );
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/2aa054a4/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java
deleted file mode 100644
index 1014ab6..0000000
--- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.druid;
-
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
-import io.druid.metadata.MetadataStorageConnectorConfig;
-import io.druid.metadata.MetadataStorageTablesConfig;
-import io.druid.metadata.storage.derby.DerbyConnector;
-import org.junit.Assert;
-import org.junit.rules.ExternalResource;
-import org.skife.jdbi.v2.DBI;
-import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
-
-import java.sql.SQLException;
-import java.util.UUID;
-
-public class TestDerbyConnector extends DerbyConnector {
-  private final String jdbcUri;
-
-  public TestDerbyConnector(
-          Supplier<MetadataStorageConnectorConfig> config,
-          Supplier<MetadataStorageTablesConfig> dbTables
-  ) {
-    this(config, dbTables, "jdbc:derby:memory:druidTest" + dbSafeUUID());
-  }
-
-  protected TestDerbyConnector(
-          Supplier<MetadataStorageConnectorConfig> config,
-          Supplier<MetadataStorageTablesConfig> dbTables,
-          String jdbcUri
-  ) {
-    super(config, dbTables, new DBI(jdbcUri + ";create=true"));
-    this.jdbcUri = jdbcUri;
-  }
-
-  public void tearDown() {
-    try {
-      new DBI(jdbcUri + ";drop=true").open().close();
-    } catch (UnableToObtainConnectionException e) {
-      SQLException cause = (SQLException) e.getCause();
-      // error code "08006" indicates proper shutdown
-      Assert.assertEquals(String.format("Derby not shutdown: [%s]", cause.toString()), "08006",
-              cause.getSQLState()
-      );
-    }
-  }
-
-  public static String dbSafeUUID() {
-    return UUID.randomUUID().toString().replace("-", "");
-  }
-
-  public String getJdbcUri() {
-    return jdbcUri;
-  }
-
-  public static class DerbyConnectorRule extends ExternalResource {
-    private TestDerbyConnector connector;
-
-    private final Supplier<MetadataStorageTablesConfig> dbTables;
-
-    private final MetadataStorageConnectorConfig connectorConfig;
-
-    public DerbyConnectorRule() {
-      this("druidTest" + dbSafeUUID());
-    }
-
-    private DerbyConnectorRule(
-            final String defaultBase
-    ) {
-      this(Suppliers.ofInstance(MetadataStorageTablesConfig.fromBase(defaultBase)));
-    }
-
-    public DerbyConnectorRule(
-            Supplier<MetadataStorageTablesConfig> dbTables
-    ) {
-      this.dbTables = dbTables;
-      this.connectorConfig = new MetadataStorageConnectorConfig() {
-        @Override
-        public String getConnectURI() {
-          return connector.getJdbcUri();
-        }
-      };
-    }
-
-    @Override
-    protected void before() throws Throwable {
-      connector = new TestDerbyConnector(Suppliers.ofInstance(connectorConfig), dbTables);
-      connector.getDBI().open().close(); // create db
-    }
-
-    @Override
-    protected void after() {
-      connector.tearDown();
-    }
-
-    public TestDerbyConnector getConnector() {
-      return connector;
-    }
-
-    public MetadataStorageConnectorConfig getMetadataConnectorConfig() {
-      return connectorConfig;
-    }
-
-    public Supplier<MetadataStorageTablesConfig> metadataTablesConfigSupplier() {
-      return dbTables;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/2aa054a4/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
new file mode 100644
index 0000000..da6610a
--- /dev/null
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.druid;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import io.druid.indexer.JobHelper;
+import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler;
+import io.druid.segment.loading.SegmentLoadingException;
+import io.druid.timeline.DataSegment;
+import io.druid.timeline.partition.NoneShardSpec;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+import org.skife.jdbi.v2.Handle;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.UUID;
+
+public class TestDruidStorageHandler {
+
+  @Rule
+  public final DerbyConnectorTestUtility.DerbyConnectorRule derbyConnectorRule = new DerbyConnectorTestUtility.DerbyConnectorRule();
+
+  @Rule
+  public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  private static final String DATA_SOURCE_NAME = "testName";
+
+  private String segmentsTable;
+
+  private String tableWorkingPath;
+
+  private DataSegment dataSegment = DataSegment.builder().dataSource(DATA_SOURCE_NAME).version("v1")
+          .interval(new Interval(100, 170)).shardSpec(NoneShardSpec.instance()).build();
+
+  @Before
+  public void before() throws Throwable {
+    tableWorkingPath = temporaryFolder.newFolder().getAbsolutePath();
+    segmentsTable = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();
+    Map<String, String> mockMap = ImmutableMap.of(Constants.DRUID_DATA_SOURCE, DATA_SOURCE_NAME);
+    Mockito.when(tableMock.getParameters()).thenReturn(mockMap);
+    Mockito.when(tableMock.getPartitionKeysSize()).thenReturn(0);
+    StorageDescriptor storageDes = Mockito.mock(StorageDescriptor.class);
+    Mockito.when(storageDes.getBucketColsSize()).thenReturn(0);
+    Mockito.when(tableMock.getSd()).thenReturn(storageDes);
+    Mockito.when(tableMock.getDbName()).thenReturn(DATA_SOURCE_NAME);
+  }
+
+  Table tableMock = Mockito.mock(Table.class);
+
+  @Test
+  public void testPreCreateTableWillCreateSegmentsTable() throws MetaException {
+    DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
+            derbyConnectorRule.getConnector(),
+            new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
+            derbyConnectorRule.metadataTablesConfigSupplier().get(),
+            null
+    );
+
+    try (Handle handle = derbyConnectorRule.getConnector().getDBI().open()) {
+      Assert.assertFalse(derbyConnectorRule.getConnector()
+              .tableExists(handle,
+                      segmentsTable
+              ));
+      druidStorageHandler.preCreateTable(tableMock);
+      Assert.assertTrue(derbyConnectorRule.getConnector()
+              .tableExists(handle,
+                      segmentsTable
+              ));
+    }
+
+  }
+
+  @Test(expected = MetaException.class)
+  public void testPreCreateTableWhenDataSourceExists() throws MetaException {
+    derbyConnectorRule.getConnector().createSegmentTable();
+    SQLMetadataStorageUpdaterJobHandler sqlMetadataStorageUpdaterJobHandler = new SQLMetadataStorageUpdaterJobHandler(
+            derbyConnectorRule.getConnector());
+    sqlMetadataStorageUpdaterJobHandler.publishSegments(segmentsTable, Arrays.asList(dataSegment),
+            DruidStorageHandlerUtils.JSON_MAPPER
+    );
+    DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
+            derbyConnectorRule.getConnector(),
+            new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
+            derbyConnectorRule.metadataTablesConfigSupplier().get(),
+            null
+    );
+    druidStorageHandler.preCreateTable(tableMock);
+  }
+
+  @Test
+  public void testCommitCreateTablePlusCommitDropTableWithoutPurge()
+          throws MetaException, IOException {
+    DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
+            derbyConnectorRule.getConnector(),
+            new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
+            derbyConnectorRule.metadataTablesConfigSupplier().get(),
+            null
+    );
+    druidStorageHandler.preCreateTable(tableMock);
+    Configuration config = new Configuration();
+    config.set(String.valueOf(HiveConf.ConfVars.HIVEQUERYID), UUID.randomUUID().toString());
+    config.set(String.valueOf(HiveConf.ConfVars.DRUID_WORKING_DIR), tableWorkingPath);
+    druidStorageHandler.setConf(config);
+    LocalFileSystem localFileSystem = FileSystem.getLocal(config);
+    Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName());
+    Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment,
+            new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME)
+    );
+    DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, dataSegment, descriptorPath);
+    druidStorageHandler.commitCreateTable(tableMock);
+    Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), Lists.newArrayList(
+            DruidStorageHandlerUtils.getAllDataSourceNames(derbyConnectorRule.getConnector(),
+                    derbyConnectorRule.metadataTablesConfigSupplier().get()
+            )).toArray());
+    druidStorageHandler.commitDropTable(tableMock, false);
+    Assert.assertArrayEquals(Lists.newArrayList().toArray(), Lists.newArrayList(
+            DruidStorageHandlerUtils.getAllDataSourceNames(derbyConnectorRule.getConnector(),
+                    derbyConnectorRule.metadataTablesConfigSupplier().get()
+            )).toArray());
+
+  }
+
+  @Test
+  public void testCommitInsertTable() throws MetaException, IOException {
+    DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
+            derbyConnectorRule.getConnector(),
+            new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
+            derbyConnectorRule.metadataTablesConfigSupplier().get(),
+            null
+    );
+    druidStorageHandler.preCreateTable(tableMock);
+    Configuration config = new Configuration();
+    config.set(String.valueOf(HiveConf.ConfVars.HIVEQUERYID), UUID.randomUUID().toString());
+    config.set(String.valueOf(HiveConf.ConfVars.DRUID_WORKING_DIR), tableWorkingPath);
+    druidStorageHandler.setConf(config);
+    LocalFileSystem localFileSystem = FileSystem.getLocal(config);
+    Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName());
+    Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment,
+            new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME)
+    );
+    DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, dataSegment, descriptorPath);
+    druidStorageHandler.commitCreateTable(tableMock);
+    Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), Lists.newArrayList(
+            DruidStorageHandlerUtils.getAllDataSourceNames(derbyConnectorRule.getConnector(),
+                    derbyConnectorRule.metadataTablesConfigSupplier().get()
+            )).toArray());
+  }
+
+  @Test
+  public void testDeleteSegment() throws IOException, SegmentLoadingException {
+    DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
+            derbyConnectorRule.getConnector(),
+            new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
+            derbyConnectorRule.metadataTablesConfigSupplier().get(),
+            null
+    );
+
+    String segmentRootPath = temporaryFolder.newFolder().getAbsolutePath();
+    Configuration config = new Configuration();
+    druidStorageHandler.setConf(config);
+    LocalFileSystem localFileSystem = FileSystem.getLocal(config);
+
+    Path segmentOutputPath = JobHelper
+            .makeSegmentOutputPath(new Path(segmentRootPath), localFileSystem, dataSegment);
+    Path indexPath = new Path(segmentOutputPath, "index.zip");
+    DataSegment dataSegmentWithLoadspect = DataSegment.builder(dataSegment).loadSpec(
+            ImmutableMap.<String, Object>of("path", indexPath)).build();
+    OutputStream outputStream = localFileSystem.create(indexPath, true);
+    outputStream.close();
+    Assert.assertTrue("index file is not created ??", localFileSystem.exists(indexPath));
+    Assert.assertTrue(localFileSystem.exists(segmentOutputPath));
+
+    druidStorageHandler.deleteSegment(dataSegmentWithLoadspect);
+    // path format -- > .../dataSource/interval/version/partitionNum/xxx.zip
+    Assert.assertFalse("Index file still there ??", localFileSystem.exists(indexPath));
+    // path format of segmentOutputPath -- > .../dataSource/interval/version/partitionNum/
+    Assert.assertFalse("PartitionNum directory still there ??",
+            localFileSystem.exists(segmentOutputPath)
+    );
+    Assert.assertFalse("Version directory still there ??",
+            localFileSystem.exists(segmentOutputPath.getParent())
+    );
+    Assert.assertFalse("Interval directory still there ??",
+            localFileSystem.exists(segmentOutputPath.getParent().getParent())
+    );
+    Assert.assertFalse("Data source directory still there ??",
+            localFileSystem.exists(segmentOutputPath.getParent().getParent().getParent())
+    );
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2aa054a4/druid-handler/src/test/org/apache/hadoop/hive/ql/io/DruidRecordWriterTest.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/DruidRecordWriterTest.java b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/DruidRecordWriterTest.java
deleted file mode 100644
index f72a735..0000000
--- a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/DruidRecordWriterTest.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.io;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.metamx.common.Granularity;
-import io.druid.data.input.Firehose;
-import io.druid.data.input.InputRow;
-import io.druid.data.input.impl.DimensionSchema;
-import io.druid.data.input.impl.DimensionsSpec;
-import io.druid.data.input.impl.InputRowParser;
-import io.druid.data.input.impl.MapInputRowParser;
-import io.druid.data.input.impl.StringDimensionSchema;
-import io.druid.data.input.impl.TimeAndDimsParseSpec;
-import io.druid.data.input.impl.TimestampSpec;
-import io.druid.granularity.QueryGranularities;
-import io.druid.query.aggregation.AggregatorFactory;
-import io.druid.query.aggregation.LongSumAggregatorFactory;
-import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
-import io.druid.segment.QueryableIndex;
-import io.druid.segment.QueryableIndexStorageAdapter;
-import io.druid.segment.indexing.DataSchema;
-import io.druid.segment.indexing.RealtimeTuningConfig;
-import io.druid.segment.indexing.granularity.UniformGranularitySpec;
-import io.druid.segment.loading.DataSegmentPusher;
-import io.druid.segment.loading.LocalDataSegmentPuller;
-import io.druid.segment.loading.LocalDataSegmentPusher;
-import io.druid.segment.loading.LocalDataSegmentPusherConfig;
-import io.druid.segment.loading.SegmentLoadingException;
-import io.druid.segment.realtime.firehose.IngestSegmentFirehose;
-import io.druid.segment.realtime.firehose.WindowedStorageAdapter;
-import io.druid.timeline.DataSegment;
-import org.apache.calcite.adapter.druid.DruidTable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.Constants;
-import org.apache.hadoop.hive.druid.DruidStorageHandler;
-import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
-import org.apache.hadoop.hive.druid.io.DruidRecordWriter;
-import org.apache.hadoop.hive.druid.serde.DruidWritable;
-import org.joda.time.DateTime;
-import org.joda.time.Interval;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import javax.annotation.Nullable;
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-public class DruidRecordWriterTest {
-  private ObjectMapper objectMapper = DruidStorageHandlerUtils.JSON_MAPPER;
-
-  private static final Interval INTERVAL_FULL = new Interval("2014-10-22T00:00:00Z/P1D");
-
-  @Rule
-  public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-  private DruidRecordWriter druidRecordWriter;
-
-  final List<ImmutableMap<String, Object>> expectedRows = ImmutableList.of(
-          ImmutableMap.<String, Object>of(
-                  DruidTable.DEFAULT_TIMESTAMP_COLUMN,
-                  DateTime.parse("2014-10-22T00:00:00.000Z").getMillis(),
-                  "host", ImmutableList.of("a.example.com"),
-                  "visited_sum", 190L,
-                  "unique_hosts", 1.0d
-          ),
-          ImmutableMap.<String, Object>of(
-                  DruidTable.DEFAULT_TIMESTAMP_COLUMN,
-                  DateTime.parse("2014-10-22T01:00:00.000Z").getMillis(),
-                  "host", ImmutableList.of("b.example.com"),
-                  "visited_sum", 175L,
-                  "unique_hosts", 1.0d
-          ),
-          ImmutableMap.<String, Object>of(
-                  DruidTable.DEFAULT_TIMESTAMP_COLUMN,
-                  DateTime.parse("2014-10-22T02:00:00.000Z").getMillis(),
-                  "host", ImmutableList.of("c.example.com"),
-                  "visited_sum", 270L,
-                  "unique_hosts", 1.0d
-          )
-  );
-
-  // This test need this patch https://github.com/druid-io/druid/pull/3483
-  @Ignore
-  @Test
-  public void testWrite() throws IOException, SegmentLoadingException {
-
-    final String dataSourceName = "testDataSource";
-    final File segmentOutputDir = temporaryFolder.newFolder();
-    final File workingDir = temporaryFolder.newFolder();
-    Configuration config = new Configuration();
-
-    final InputRowParser inputRowParser = new MapInputRowParser(new TimeAndDimsParseSpec(
-            new TimestampSpec(DruidTable.DEFAULT_TIMESTAMP_COLUMN, "auto", null),
-            new DimensionsSpec(ImmutableList.<DimensionSchema>of(new StringDimensionSchema("host")),
-                    null, null
-            )
-    ));
-    final Map<String, Object> parserMap = objectMapper.convertValue(inputRowParser, Map.class);
-
-    DataSchema dataSchema = new DataSchema(
-            dataSourceName,
-            parserMap,
-            new AggregatorFactory[] {
-                    new LongSumAggregatorFactory("visited_sum", "visited_sum"),
-                    new HyperUniquesAggregatorFactory("unique_hosts", "unique_hosts")
-            },
-            new UniformGranularitySpec(
-                    Granularity.DAY, QueryGranularities.NONE, ImmutableList.of(INTERVAL_FULL)
-            ),
-            objectMapper
-    );
-
-    RealtimeTuningConfig tuningConfig = RealtimeTuningConfig
-            .makeDefaultTuningConfig(temporaryFolder.newFolder());
-    LocalFileSystem localFileSystem = FileSystem.getLocal(config);
-    DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(
-            new LocalDataSegmentPusherConfig() {
-              @Override
-              public File getStorageDirectory() {return segmentOutputDir;}
-            }, objectMapper);
-
-    Path segmentDescriptroPath = new Path(workingDir.getAbsolutePath(),
-            DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME
-    );
-    druidRecordWriter = new DruidRecordWriter(dataSchema, tuningConfig, dataSegmentPusher, 20,
-            segmentDescriptroPath, localFileSystem
-    );
-
-    List<DruidWritable> druidWritables = Lists.transform(expectedRows,
-            new Function<ImmutableMap<String, Object>, DruidWritable>() {
-              @Nullable
-              @Override
-              public DruidWritable apply(@Nullable ImmutableMap<String, Object> input
-              ) {
-                return new DruidWritable(ImmutableMap.<String, Object>builder().putAll(input)
-                        .put(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME,
-                                Granularity.DAY.truncate(
-                                        new DateTime((long) input
-                                                .get(DruidTable.DEFAULT_TIMESTAMP_COLUMN)))
-                                        .getMillis()
-                        ).build());
-              }
-            }
-    );
-    for (DruidWritable druidWritable : druidWritables) {
-      druidRecordWriter.write(druidWritable);
-    }
-    druidRecordWriter.close(false);
-    List<DataSegment> dataSegmentList = DruidStorageHandlerUtils
-            .getPublishedSegments(segmentDescriptroPath, config);
-    Assert.assertEquals(1, dataSegmentList.size());
-    File tmpUnzippedSegmentDir = temporaryFolder.newFolder();
-    new LocalDataSegmentPuller().getSegmentFiles(dataSegmentList.get(0), tmpUnzippedSegmentDir);
-    final QueryableIndex queryableIndex = DruidStorageHandlerUtils.INDEX_IO
-            .loadIndex(tmpUnzippedSegmentDir);
-
-    QueryableIndexStorageAdapter adapter = new QueryableIndexStorageAdapter(queryableIndex);
-
-    Firehose firehose = new IngestSegmentFirehose(
-            ImmutableList.of(new WindowedStorageAdapter(adapter, adapter.getInterval())),
-            ImmutableList.of("host"),
-            ImmutableList.of("visited_sum", "unique_hosts"),
-            null,
-            QueryGranularities.NONE
-    );
-
-    List<InputRow> rows = Lists.newArrayList();
-    while (firehose.hasMore()) {
-      rows.add(firehose.nextRow());
-    }
-
-    verifyRows(expectedRows, rows);
-
-  }
-
-  private void verifyRows(List<ImmutableMap<String, Object>> expectedRows,
-          List<InputRow> actualRows
-  ) {
-    System.out.println("actualRows = " + actualRows);
-    Assert.assertEquals(expectedRows.size(), actualRows.size());
-
-    for (int i = 0; i < expectedRows.size(); i++) {
-      Map<String, Object> expected = expectedRows.get(i);
-      InputRow actual = actualRows.get(i);
-
-      Assert.assertEquals(ImmutableList.of("host"), actual.getDimensions());
-
-      Assert.assertEquals(expected.get(DruidTable.DEFAULT_TIMESTAMP_COLUMN),
-              actual.getTimestamp().getMillis()
-      );
-      Assert.assertEquals(expected.get("host"), actual.getDimension("host"));
-      Assert.assertEquals(expected.get("visited_sum"), actual.getLongMetric("visited_sum"));
-      Assert.assertEquals(
-              (Double) expected.get("unique_hosts"),
-              (Double) HyperUniquesAggregatorFactory
-                      .estimateCardinality(actual.getRaw("unique_hosts")),
-              0.001
-      );
-    }
-  }
-
-  @Test
-  public void testSerDesr() throws IOException {
-    String segment = "{\"dataSource\":\"datasource2015\",\"interval\":\"2015-06-01T00:00:00.000-04:00/2015-06-02T00:00:00.000-04:00\",\"version\":\"2016-11-04T19:24:01.732-04:00\",\"loadSpec\":{\"type\":\"hdfs\",\"path\":\"hdfs://cn105-10.l42scl.hortonworks.com:8020/apps/hive/warehouse/druid.db/.hive-staging_hive_2016-11-04_19-23-50_168_1550339856804207572-1/_task_tmp.-ext-10002/_tmp.000000_0/datasource2015/20150601T000000.000-0400_20150602T000000.000-0400/2016-11-04T19_24_01.732-04_00/0/index.zip\"},\"dimensions\":\"dimension1\",\"metrics\":\"bigint\",\"shardSpec\":{\"type\":\"linear\",\"partitionNum\":0},\"binaryVersion\":9,\"size\":1765,\"identifier\":\"datasource2015_2015-06-01T00:00:00.000-04:00_2015-06-02T00:00:00.000-04:00_2016-11-04T19:24:01.732-04:00\"}";
-    DataSegment dataSegment = objectMapper.reader(DataSegment.class)
-            .readValue(segment);
-    Assert.assertTrue(dataSegment.getDataSource().equals("datasource2015"));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/2aa054a4/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
new file mode 100644
index 0000000..9ec82c0
--- /dev/null
+++ b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.metamx.common.Granularity;
+import io.druid.data.input.Firehose;
+import io.druid.data.input.InputRow;
+import io.druid.data.input.impl.DimensionSchema;
+import io.druid.data.input.impl.DimensionsSpec;
+import io.druid.data.input.impl.InputRowParser;
+import io.druid.data.input.impl.MapInputRowParser;
+import io.druid.data.input.impl.StringDimensionSchema;
+import io.druid.data.input.impl.TimeAndDimsParseSpec;
+import io.druid.data.input.impl.TimestampSpec;
+import io.druid.granularity.QueryGranularities;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.LongSumAggregatorFactory;
+import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
+import io.druid.segment.QueryableIndex;
+import io.druid.segment.QueryableIndexStorageAdapter;
+import io.druid.segment.indexing.DataSchema;
+import io.druid.segment.indexing.RealtimeTuningConfig;
+import io.druid.segment.indexing.granularity.UniformGranularitySpec;
+import io.druid.segment.loading.DataSegmentPusher;
+import io.druid.segment.loading.LocalDataSegmentPuller;
+import io.druid.segment.loading.LocalDataSegmentPusher;
+import io.druid.segment.loading.LocalDataSegmentPusherConfig;
+import io.druid.segment.loading.SegmentLoadingException;
+import io.druid.segment.realtime.firehose.IngestSegmentFirehose;
+import io.druid.segment.realtime.firehose.WindowedStorageAdapter;
+import io.druid.timeline.DataSegment;
+import org.apache.calcite.adapter.druid.DruidTable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.druid.DruidStorageHandler;
+import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.druid.io.DruidRecordWriter;
+import org.apache.hadoop.hive.druid.serde.DruidWritable;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public class TestDruidRecordWriter {
+  private ObjectMapper objectMapper = DruidStorageHandlerUtils.JSON_MAPPER;
+
+  private static final Interval INTERVAL_FULL = new Interval("2014-10-22T00:00:00Z/P1D");
+
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  private DruidRecordWriter druidRecordWriter;
+
+  final List<ImmutableMap<String, Object>> expectedRows = ImmutableList.of(
+          ImmutableMap.<String, Object>of(
+                  DruidTable.DEFAULT_TIMESTAMP_COLUMN,
+                  DateTime.parse("2014-10-22T00:00:00.000Z").getMillis(),
+                  "host", ImmutableList.of("a.example.com"),
+                  "visited_sum", 190L,
+                  "unique_hosts", 1.0d
+          ),
+          ImmutableMap.<String, Object>of(
+                  DruidTable.DEFAULT_TIMESTAMP_COLUMN,
+                  DateTime.parse("2014-10-22T01:00:00.000Z").getMillis(),
+                  "host", ImmutableList.of("b.example.com"),
+                  "visited_sum", 175L,
+                  "unique_hosts", 1.0d
+          ),
+          ImmutableMap.<String, Object>of(
+                  DruidTable.DEFAULT_TIMESTAMP_COLUMN,
+                  DateTime.parse("2014-10-22T02:00:00.000Z").getMillis(),
+                  "host", ImmutableList.of("c.example.com"),
+                  "visited_sum", 270L,
+                  "unique_hosts", 1.0d
+          )
+  );
+
+  // This test need this patch https://github.com/druid-io/druid/pull/3483
+  @Ignore
+  @Test
+  public void testWrite() throws IOException, SegmentLoadingException {
+
+    final String dataSourceName = "testDataSource";
+    final File segmentOutputDir = temporaryFolder.newFolder();
+    final File workingDir = temporaryFolder.newFolder();
+    Configuration config = new Configuration();
+
+    final InputRowParser inputRowParser = new MapInputRowParser(new TimeAndDimsParseSpec(
+            new TimestampSpec(DruidTable.DEFAULT_TIMESTAMP_COLUMN, "auto", null),
+            new DimensionsSpec(ImmutableList.<DimensionSchema>of(new StringDimensionSchema("host")),
+                    null, null
+            )
+    ));
+    final Map<String, Object> parserMap = objectMapper.convertValue(inputRowParser, Map.class);
+
+    DataSchema dataSchema = new DataSchema(
+            dataSourceName,
+            parserMap,
+            new AggregatorFactory[] {
+                    new LongSumAggregatorFactory("visited_sum", "visited_sum"),
+                    new HyperUniquesAggregatorFactory("unique_hosts", "unique_hosts")
+            },
+            new UniformGranularitySpec(
+                    Granularity.DAY, QueryGranularities.NONE, ImmutableList.of(INTERVAL_FULL)
+            ),
+            objectMapper
+    );
+
+    RealtimeTuningConfig tuningConfig = RealtimeTuningConfig
+            .makeDefaultTuningConfig(temporaryFolder.newFolder());
+    LocalFileSystem localFileSystem = FileSystem.getLocal(config);
+    DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(
+            new LocalDataSegmentPusherConfig() {
+              @Override
+              public File getStorageDirectory() {return segmentOutputDir;}
+            }, objectMapper);
+
+    Path segmentDescriptroPath = new Path(workingDir.getAbsolutePath(),
+            DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME
+    );
+    druidRecordWriter = new DruidRecordWriter(dataSchema, tuningConfig, dataSegmentPusher, 20,
+            segmentDescriptroPath, localFileSystem
+    );
+
+    List<DruidWritable> druidWritables = Lists.transform(expectedRows,
+            new Function<ImmutableMap<String, Object>, DruidWritable>() {
+              @Nullable
+              @Override
+              public DruidWritable apply(@Nullable ImmutableMap<String, Object> input
+              ) {
+                return new DruidWritable(ImmutableMap.<String, Object>builder().putAll(input)
+                        .put(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME,
+                                Granularity.DAY.truncate(
+                                        new DateTime((long) input
+                                                .get(DruidTable.DEFAULT_TIMESTAMP_COLUMN)))
+                                        .getMillis()
+                        ).build());
+              }
+            }
+    );
+    for (DruidWritable druidWritable : druidWritables) {
+      druidRecordWriter.write(druidWritable);
+    }
+    druidRecordWriter.close(false);
+    List<DataSegment> dataSegmentList = DruidStorageHandlerUtils
+            .getPublishedSegments(segmentDescriptroPath, config);
+    Assert.assertEquals(1, dataSegmentList.size());
+    File tmpUnzippedSegmentDir = temporaryFolder.newFolder();
+    new LocalDataSegmentPuller().getSegmentFiles(dataSegmentList.get(0), tmpUnzippedSegmentDir);
+    final QueryableIndex queryableIndex = DruidStorageHandlerUtils.INDEX_IO
+            .loadIndex(tmpUnzippedSegmentDir);
+
+    QueryableIndexStorageAdapter adapter = new QueryableIndexStorageAdapter(queryableIndex);
+
+    Firehose firehose = new IngestSegmentFirehose(
+            ImmutableList.of(new WindowedStorageAdapter(adapter, adapter.getInterval())),
+            ImmutableList.of("host"),
+            ImmutableList.of("visited_sum", "unique_hosts"),
+            null,
+            QueryGranularities.NONE
+    );
+
+    List<InputRow> rows = Lists.newArrayList();
+    while (firehose.hasMore()) {
+      rows.add(firehose.nextRow());
+    }
+
+    verifyRows(expectedRows, rows);
+
+  }
+
+  private void verifyRows(List<ImmutableMap<String, Object>> expectedRows,
+          List<InputRow> actualRows
+  ) {
+    System.out.println("actualRows = " + actualRows);
+    Assert.assertEquals(expectedRows.size(), actualRows.size());
+
+    for (int i = 0; i < expectedRows.size(); i++) {
+      Map<String, Object> expected = expectedRows.get(i);
+      InputRow actual = actualRows.get(i);
+
+      Assert.assertEquals(ImmutableList.of("host"), actual.getDimensions());
+
+      Assert.assertEquals(expected.get(DruidTable.DEFAULT_TIMESTAMP_COLUMN),
+              actual.getTimestamp().getMillis()
+      );
+      Assert.assertEquals(expected.get("host"), actual.getDimension("host"));
+      Assert.assertEquals(expected.get("visited_sum"), actual.getLongMetric("visited_sum"));
+      Assert.assertEquals(
+              (Double) expected.get("unique_hosts"),
+              (Double) HyperUniquesAggregatorFactory
+                      .estimateCardinality(actual.getRaw("unique_hosts")),
+              0.001
+      );
+    }
+  }
+
+  @Test
+  public void testSerDesr() throws IOException {
+    String segment = "{\"dataSource\":\"datasource2015\",\"interval\":\"2015-06-01T00:00:00.000-04:00/2015-06-02T00:00:00.000-04:00\",\"version\":\"2016-11-04T19:24:01.732-04:00\",\"loadSpec\":{\"type\":\"hdfs\",\"path\":\"hdfs://cn105-10.l42scl.hortonworks.com:8020/apps/hive/warehouse/druid.db/.hive-staging_hive_2016-11-04_19-23-50_168_1550339856804207572-1/_task_tmp.-ext-10002/_tmp.000000_0/datasource2015/20150601T000000.000-0400_20150602T000000.000-0400/2016-11-04T19_24_01.732-04_00/0/index.zip\"},\"dimensions\":\"dimension1\",\"metrics\":\"bigint\",\"shardSpec\":{\"type\":\"linear\",\"partitionNum\":0},\"binaryVersion\":9,\"size\":1765,\"identifier\":\"datasource2015_2015-06-01T00:00:00.000-04:00_2015-06-02T00:00:00.000-04:00_2016-11-04T19:24:01.732-04:00\"}";
+    DataSegment dataSegment = objectMapper.reader(DataSegment.class)
+            .readValue(segment);
+    Assert.assertTrue(dataSegment.getDataSource().equals("datasource2015"));
+  }
+
+}