You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2017/02/25 19:54:58 UTC
hive git commit: HIVE-15702 : Test timeout : TestDerbyConnector (Slim
Bouguerra via Ashutosh Chauhan)
Repository: hive
Updated Branches:
refs/heads/master 10449a7af -> 2aa054a4d
HIVE-15702 : Test timeout : TestDerbyConnector (Slim Bouguerra via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2aa054a4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2aa054a4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2aa054a4
Branch: refs/heads/master
Commit: 2aa054a4d5aaf516839a3574cf600765288d98b0
Parents: 10449a7
Author: Slim Bouguerra <sl...@gmail.com>
Authored: Sat Feb 25 11:54:06 2017 -0800
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Sat Feb 25 11:54:06 2017 -0800
----------------------------------------------------------------------
.../hive/druid/DerbyConnectorTestUtility.java | 126 ++++++++++
.../hive/druid/DruidStorageHandlerTest.java | 224 -----------------
.../hadoop/hive/druid/TestDerbyConnector.java | 126 ----------
.../hive/druid/TestDruidStorageHandler.java | 224 +++++++++++++++++
.../hive/ql/io/DruidRecordWriterTest.java | 239 -------------------
.../hive/ql/io/TestDruidRecordWriter.java | 239 +++++++++++++++++++
6 files changed, 589 insertions(+), 589 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/2aa054a4/druid-handler/src/test/org/apache/hadoop/hive/druid/DerbyConnectorTestUtility.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/DerbyConnectorTestUtility.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/DerbyConnectorTestUtility.java
new file mode 100644
index 0000000..f9304a5
--- /dev/null
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/DerbyConnectorTestUtility.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.druid;
+
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import io.druid.metadata.MetadataStorageConnectorConfig;
+import io.druid.metadata.MetadataStorageTablesConfig;
+import io.druid.metadata.storage.derby.DerbyConnector;
+import org.junit.Assert;
+import org.junit.rules.ExternalResource;
+import org.skife.jdbi.v2.DBI;
+import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
+
+import java.sql.SQLException;
+import java.util.UUID;
+
+public class DerbyConnectorTestUtility extends DerbyConnector {
+ private final String jdbcUri;
+
+ public DerbyConnectorTestUtility(
+ Supplier<MetadataStorageConnectorConfig> config,
+ Supplier<MetadataStorageTablesConfig> dbTables
+ ) {
+ this(config, dbTables, "jdbc:derby:memory:druidTest" + dbSafeUUID());
+ }
+
+ protected DerbyConnectorTestUtility(
+ Supplier<MetadataStorageConnectorConfig> config,
+ Supplier<MetadataStorageTablesConfig> dbTables,
+ String jdbcUri
+ ) {
+ super(config, dbTables, new DBI(jdbcUri + ";create=true"));
+ this.jdbcUri = jdbcUri;
+ }
+
+ public void tearDown() {
+ try {
+ new DBI(jdbcUri + ";drop=true").open().close();
+ } catch (UnableToObtainConnectionException e) {
+ SQLException cause = (SQLException) e.getCause();
+ // error code "08006" indicates proper shutdown
+ Assert.assertEquals(String.format("Derby not shutdown: [%s]", cause.toString()), "08006",
+ cause.getSQLState()
+ );
+ }
+ }
+
+ public static String dbSafeUUID() {
+ return UUID.randomUUID().toString().replace("-", "");
+ }
+
+ public String getJdbcUri() {
+ return jdbcUri;
+ }
+
+ public static class DerbyConnectorRule extends ExternalResource {
+ private DerbyConnectorTestUtility connector;
+
+ private final Supplier<MetadataStorageTablesConfig> dbTables;
+
+ private final MetadataStorageConnectorConfig connectorConfig;
+
+ public DerbyConnectorRule() {
+ this("druidTest" + dbSafeUUID());
+ }
+
+ private DerbyConnectorRule(
+ final String defaultBase
+ ) {
+ this(Suppliers.ofInstance(MetadataStorageTablesConfig.fromBase(defaultBase)));
+ }
+
+ public DerbyConnectorRule(
+ Supplier<MetadataStorageTablesConfig> dbTables
+ ) {
+ this.dbTables = dbTables;
+ this.connectorConfig = new MetadataStorageConnectorConfig() {
+ @Override
+ public String getConnectURI() {
+ return connector.getJdbcUri();
+ }
+ };
+ }
+
+ @Override
+ protected void before() throws Throwable {
+ connector = new DerbyConnectorTestUtility(Suppliers.ofInstance(connectorConfig), dbTables);
+ connector.getDBI().open().close(); // create db
+ }
+
+ @Override
+ protected void after() {
+ connector.tearDown();
+ }
+
+ public DerbyConnectorTestUtility getConnector() {
+ return connector;
+ }
+
+ public MetadataStorageConnectorConfig getMetadataConnectorConfig() {
+ return connectorConfig;
+ }
+
+ public Supplier<MetadataStorageTablesConfig> metadataTablesConfigSupplier() {
+ return dbTables;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/2aa054a4/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java
deleted file mode 100644
index 3573bf9..0000000
--- a/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerTest.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.druid;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import io.druid.indexer.JobHelper;
-import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler;
-import io.druid.segment.loading.SegmentLoadingException;
-import io.druid.timeline.DataSegment;
-import io.druid.timeline.partition.NoneShardSpec;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.Constants;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.joda.time.Interval;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.mockito.Mockito;
-import org.skife.jdbi.v2.Handle;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.UUID;
-
-public class DruidStorageHandlerTest {
-
- @Rule
- public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
-
- @Rule
- public final TemporaryFolder temporaryFolder = new TemporaryFolder();
-
- private static final String DATA_SOURCE_NAME = "testName";
-
- private String segmentsTable;
-
- private String tableWorkingPath;
-
- private DataSegment dataSegment = DataSegment.builder().dataSource(DATA_SOURCE_NAME).version("v1")
- .interval(new Interval(100, 170)).shardSpec(NoneShardSpec.instance()).build();
-
- @Before
- public void before() throws Throwable {
- tableWorkingPath = temporaryFolder.newFolder().getAbsolutePath();
- segmentsTable = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();
- Map<String, String> mockMap = ImmutableMap.of(Constants.DRUID_DATA_SOURCE, DATA_SOURCE_NAME);
- Mockito.when(tableMock.getParameters()).thenReturn(mockMap);
- Mockito.when(tableMock.getPartitionKeysSize()).thenReturn(0);
- StorageDescriptor storageDes = Mockito.mock(StorageDescriptor.class);
- Mockito.when(storageDes.getBucketColsSize()).thenReturn(0);
- Mockito.when(tableMock.getSd()).thenReturn(storageDes);
- Mockito.when(tableMock.getDbName()).thenReturn(DATA_SOURCE_NAME);
- }
-
- Table tableMock = Mockito.mock(Table.class);
-
- @Test
- public void testPreCreateTableWillCreateSegmentsTable() throws MetaException {
- DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
- derbyConnectorRule.getConnector(),
- new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
- derbyConnectorRule.metadataTablesConfigSupplier().get(),
- null
- );
-
- try (Handle handle = derbyConnectorRule.getConnector().getDBI().open()) {
- Assert.assertFalse(derbyConnectorRule.getConnector()
- .tableExists(handle,
- segmentsTable
- ));
- druidStorageHandler.preCreateTable(tableMock);
- Assert.assertTrue(derbyConnectorRule.getConnector()
- .tableExists(handle,
- segmentsTable
- ));
- }
-
- }
-
- @Test(expected = MetaException.class)
- public void testPreCreateTableWhenDataSourceExists() throws MetaException {
- derbyConnectorRule.getConnector().createSegmentTable();
- SQLMetadataStorageUpdaterJobHandler sqlMetadataStorageUpdaterJobHandler = new SQLMetadataStorageUpdaterJobHandler(
- derbyConnectorRule.getConnector());
- sqlMetadataStorageUpdaterJobHandler.publishSegments(segmentsTable, Arrays.asList(dataSegment),
- DruidStorageHandlerUtils.JSON_MAPPER
- );
- DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
- derbyConnectorRule.getConnector(),
- new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
- derbyConnectorRule.metadataTablesConfigSupplier().get(),
- null
- );
- druidStorageHandler.preCreateTable(tableMock);
- }
-
- @Test
- public void testCommitCreateTablePlusCommitDropTableWithoutPurge()
- throws MetaException, IOException {
- DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
- derbyConnectorRule.getConnector(),
- new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
- derbyConnectorRule.metadataTablesConfigSupplier().get(),
- null
- );
- druidStorageHandler.preCreateTable(tableMock);
- Configuration config = new Configuration();
- config.set(String.valueOf(HiveConf.ConfVars.HIVEQUERYID), UUID.randomUUID().toString());
- config.set(String.valueOf(HiveConf.ConfVars.DRUID_WORKING_DIR), tableWorkingPath);
- druidStorageHandler.setConf(config);
- LocalFileSystem localFileSystem = FileSystem.getLocal(config);
- Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName());
- Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment,
- new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME)
- );
- DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, dataSegment, descriptorPath);
- druidStorageHandler.commitCreateTable(tableMock);
- Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), Lists.newArrayList(
- DruidStorageHandlerUtils.getAllDataSourceNames(derbyConnectorRule.getConnector(),
- derbyConnectorRule.metadataTablesConfigSupplier().get()
- )).toArray());
- druidStorageHandler.commitDropTable(tableMock, false);
- Assert.assertArrayEquals(Lists.newArrayList().toArray(), Lists.newArrayList(
- DruidStorageHandlerUtils.getAllDataSourceNames(derbyConnectorRule.getConnector(),
- derbyConnectorRule.metadataTablesConfigSupplier().get()
- )).toArray());
-
- }
-
- @Test
- public void testCommitInsertTable() throws MetaException, IOException {
- DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
- derbyConnectorRule.getConnector(),
- new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
- derbyConnectorRule.metadataTablesConfigSupplier().get(),
- null
- );
- druidStorageHandler.preCreateTable(tableMock);
- Configuration config = new Configuration();
- config.set(String.valueOf(HiveConf.ConfVars.HIVEQUERYID), UUID.randomUUID().toString());
- config.set(String.valueOf(HiveConf.ConfVars.DRUID_WORKING_DIR), tableWorkingPath);
- druidStorageHandler.setConf(config);
- LocalFileSystem localFileSystem = FileSystem.getLocal(config);
- Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName());
- Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment,
- new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME)
- );
- DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, dataSegment, descriptorPath);
- druidStorageHandler.commitCreateTable(tableMock);
- Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), Lists.newArrayList(
- DruidStorageHandlerUtils.getAllDataSourceNames(derbyConnectorRule.getConnector(),
- derbyConnectorRule.metadataTablesConfigSupplier().get()
- )).toArray());
- }
-
- @Test
- public void testDeleteSegment() throws IOException, SegmentLoadingException {
- DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
- derbyConnectorRule.getConnector(),
- new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
- derbyConnectorRule.metadataTablesConfigSupplier().get(),
- null
- );
-
- String segmentRootPath = temporaryFolder.newFolder().getAbsolutePath();
- Configuration config = new Configuration();
- druidStorageHandler.setConf(config);
- LocalFileSystem localFileSystem = FileSystem.getLocal(config);
-
- Path segmentOutputPath = JobHelper
- .makeSegmentOutputPath(new Path(segmentRootPath), localFileSystem, dataSegment);
- Path indexPath = new Path(segmentOutputPath, "index.zip");
- DataSegment dataSegmentWithLoadspect = DataSegment.builder(dataSegment).loadSpec(
- ImmutableMap.<String, Object>of("path", indexPath)).build();
- OutputStream outputStream = localFileSystem.create(indexPath, true);
- outputStream.close();
- Assert.assertTrue("index file is not created ??", localFileSystem.exists(indexPath));
- Assert.assertTrue(localFileSystem.exists(segmentOutputPath));
-
- druidStorageHandler.deleteSegment(dataSegmentWithLoadspect);
- // path format -- > .../dataSource/interval/version/partitionNum/xxx.zip
- Assert.assertFalse("Index file still there ??", localFileSystem.exists(indexPath));
- // path format of segmentOutputPath -- > .../dataSource/interval/version/partitionNum/
- Assert.assertFalse("PartitionNum directory still there ??",
- localFileSystem.exists(segmentOutputPath)
- );
- Assert.assertFalse("Version directory still there ??",
- localFileSystem.exists(segmentOutputPath.getParent())
- );
- Assert.assertFalse("Interval directory still there ??",
- localFileSystem.exists(segmentOutputPath.getParent().getParent())
- );
- Assert.assertFalse("Data source directory still there ??",
- localFileSystem.exists(segmentOutputPath.getParent().getParent().getParent())
- );
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/2aa054a4/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java
deleted file mode 100644
index 1014ab6..0000000
--- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDerbyConnector.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.druid;
-
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
-import io.druid.metadata.MetadataStorageConnectorConfig;
-import io.druid.metadata.MetadataStorageTablesConfig;
-import io.druid.metadata.storage.derby.DerbyConnector;
-import org.junit.Assert;
-import org.junit.rules.ExternalResource;
-import org.skife.jdbi.v2.DBI;
-import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
-
-import java.sql.SQLException;
-import java.util.UUID;
-
-public class TestDerbyConnector extends DerbyConnector {
- private final String jdbcUri;
-
- public TestDerbyConnector(
- Supplier<MetadataStorageConnectorConfig> config,
- Supplier<MetadataStorageTablesConfig> dbTables
- ) {
- this(config, dbTables, "jdbc:derby:memory:druidTest" + dbSafeUUID());
- }
-
- protected TestDerbyConnector(
- Supplier<MetadataStorageConnectorConfig> config,
- Supplier<MetadataStorageTablesConfig> dbTables,
- String jdbcUri
- ) {
- super(config, dbTables, new DBI(jdbcUri + ";create=true"));
- this.jdbcUri = jdbcUri;
- }
-
- public void tearDown() {
- try {
- new DBI(jdbcUri + ";drop=true").open().close();
- } catch (UnableToObtainConnectionException e) {
- SQLException cause = (SQLException) e.getCause();
- // error code "08006" indicates proper shutdown
- Assert.assertEquals(String.format("Derby not shutdown: [%s]", cause.toString()), "08006",
- cause.getSQLState()
- );
- }
- }
-
- public static String dbSafeUUID() {
- return UUID.randomUUID().toString().replace("-", "");
- }
-
- public String getJdbcUri() {
- return jdbcUri;
- }
-
- public static class DerbyConnectorRule extends ExternalResource {
- private TestDerbyConnector connector;
-
- private final Supplier<MetadataStorageTablesConfig> dbTables;
-
- private final MetadataStorageConnectorConfig connectorConfig;
-
- public DerbyConnectorRule() {
- this("druidTest" + dbSafeUUID());
- }
-
- private DerbyConnectorRule(
- final String defaultBase
- ) {
- this(Suppliers.ofInstance(MetadataStorageTablesConfig.fromBase(defaultBase)));
- }
-
- public DerbyConnectorRule(
- Supplier<MetadataStorageTablesConfig> dbTables
- ) {
- this.dbTables = dbTables;
- this.connectorConfig = new MetadataStorageConnectorConfig() {
- @Override
- public String getConnectURI() {
- return connector.getJdbcUri();
- }
- };
- }
-
- @Override
- protected void before() throws Throwable {
- connector = new TestDerbyConnector(Suppliers.ofInstance(connectorConfig), dbTables);
- connector.getDBI().open().close(); // create db
- }
-
- @Override
- protected void after() {
- connector.tearDown();
- }
-
- public TestDerbyConnector getConnector() {
- return connector;
- }
-
- public MetadataStorageConnectorConfig getMetadataConnectorConfig() {
- return connectorConfig;
- }
-
- public Supplier<MetadataStorageTablesConfig> metadataTablesConfigSupplier() {
- return dbTables;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/2aa054a4/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
new file mode 100644
index 0000000..da6610a
--- /dev/null
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.druid;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import io.druid.indexer.JobHelper;
+import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler;
+import io.druid.segment.loading.SegmentLoadingException;
+import io.druid.timeline.DataSegment;
+import io.druid.timeline.partition.NoneShardSpec;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+import org.skife.jdbi.v2.Handle;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.UUID;
+
+public class TestDruidStorageHandler {
+
+ @Rule
+ public final DerbyConnectorTestUtility.DerbyConnectorRule derbyConnectorRule = new DerbyConnectorTestUtility.DerbyConnectorRule();
+
+ @Rule
+ public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private static final String DATA_SOURCE_NAME = "testName";
+
+ private String segmentsTable;
+
+ private String tableWorkingPath;
+
+ private DataSegment dataSegment = DataSegment.builder().dataSource(DATA_SOURCE_NAME).version("v1")
+ .interval(new Interval(100, 170)).shardSpec(NoneShardSpec.instance()).build();
+
+ @Before
+ public void before() throws Throwable {
+ tableWorkingPath = temporaryFolder.newFolder().getAbsolutePath();
+ segmentsTable = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();
+ Map<String, String> mockMap = ImmutableMap.of(Constants.DRUID_DATA_SOURCE, DATA_SOURCE_NAME);
+ Mockito.when(tableMock.getParameters()).thenReturn(mockMap);
+ Mockito.when(tableMock.getPartitionKeysSize()).thenReturn(0);
+ StorageDescriptor storageDes = Mockito.mock(StorageDescriptor.class);
+ Mockito.when(storageDes.getBucketColsSize()).thenReturn(0);
+ Mockito.when(tableMock.getSd()).thenReturn(storageDes);
+ Mockito.when(tableMock.getDbName()).thenReturn(DATA_SOURCE_NAME);
+ }
+
+ Table tableMock = Mockito.mock(Table.class);
+
+ @Test
+ public void testPreCreateTableWillCreateSegmentsTable() throws MetaException {
+ DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
+ derbyConnectorRule.getConnector(),
+ new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
+ derbyConnectorRule.metadataTablesConfigSupplier().get(),
+ null
+ );
+
+ try (Handle handle = derbyConnectorRule.getConnector().getDBI().open()) {
+ Assert.assertFalse(derbyConnectorRule.getConnector()
+ .tableExists(handle,
+ segmentsTable
+ ));
+ druidStorageHandler.preCreateTable(tableMock);
+ Assert.assertTrue(derbyConnectorRule.getConnector()
+ .tableExists(handle,
+ segmentsTable
+ ));
+ }
+
+ }
+
+ @Test(expected = MetaException.class)
+ public void testPreCreateTableWhenDataSourceExists() throws MetaException {
+ derbyConnectorRule.getConnector().createSegmentTable();
+ SQLMetadataStorageUpdaterJobHandler sqlMetadataStorageUpdaterJobHandler = new SQLMetadataStorageUpdaterJobHandler(
+ derbyConnectorRule.getConnector());
+ sqlMetadataStorageUpdaterJobHandler.publishSegments(segmentsTable, Arrays.asList(dataSegment),
+ DruidStorageHandlerUtils.JSON_MAPPER
+ );
+ DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
+ derbyConnectorRule.getConnector(),
+ new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
+ derbyConnectorRule.metadataTablesConfigSupplier().get(),
+ null
+ );
+ druidStorageHandler.preCreateTable(tableMock);
+ }
+
+ @Test
+ public void testCommitCreateTablePlusCommitDropTableWithoutPurge()
+ throws MetaException, IOException {
+ DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
+ derbyConnectorRule.getConnector(),
+ new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
+ derbyConnectorRule.metadataTablesConfigSupplier().get(),
+ null
+ );
+ druidStorageHandler.preCreateTable(tableMock);
+ Configuration config = new Configuration();
+ config.set(String.valueOf(HiveConf.ConfVars.HIVEQUERYID), UUID.randomUUID().toString());
+ config.set(String.valueOf(HiveConf.ConfVars.DRUID_WORKING_DIR), tableWorkingPath);
+ druidStorageHandler.setConf(config);
+ LocalFileSystem localFileSystem = FileSystem.getLocal(config);
+ Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName());
+ Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment,
+ new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME)
+ );
+ DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, dataSegment, descriptorPath);
+ druidStorageHandler.commitCreateTable(tableMock);
+ Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), Lists.newArrayList(
+ DruidStorageHandlerUtils.getAllDataSourceNames(derbyConnectorRule.getConnector(),
+ derbyConnectorRule.metadataTablesConfigSupplier().get()
+ )).toArray());
+ druidStorageHandler.commitDropTable(tableMock, false);
+ Assert.assertArrayEquals(Lists.newArrayList().toArray(), Lists.newArrayList(
+ DruidStorageHandlerUtils.getAllDataSourceNames(derbyConnectorRule.getConnector(),
+ derbyConnectorRule.metadataTablesConfigSupplier().get()
+ )).toArray());
+
+ }
+
+ @Test
+ public void testCommitInsertTable() throws MetaException, IOException {
+ DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
+ derbyConnectorRule.getConnector(),
+ new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
+ derbyConnectorRule.metadataTablesConfigSupplier().get(),
+ null
+ );
+ druidStorageHandler.preCreateTable(tableMock);
+ Configuration config = new Configuration();
+ config.set(String.valueOf(HiveConf.ConfVars.HIVEQUERYID), UUID.randomUUID().toString());
+ config.set(String.valueOf(HiveConf.ConfVars.DRUID_WORKING_DIR), tableWorkingPath);
+ druidStorageHandler.setConf(config);
+ LocalFileSystem localFileSystem = FileSystem.getLocal(config);
+ Path taskDirPath = new Path(tableWorkingPath, druidStorageHandler.makeStagingName());
+ Path descriptorPath = DruidStorageHandlerUtils.makeSegmentDescriptorOutputPath(dataSegment,
+ new Path(taskDirPath, DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME)
+ );
+ DruidStorageHandlerUtils.writeSegmentDescriptor(localFileSystem, dataSegment, descriptorPath);
+ druidStorageHandler.commitCreateTable(tableMock);
+ Assert.assertArrayEquals(Lists.newArrayList(DATA_SOURCE_NAME).toArray(), Lists.newArrayList(
+ DruidStorageHandlerUtils.getAllDataSourceNames(derbyConnectorRule.getConnector(),
+ derbyConnectorRule.metadataTablesConfigSupplier().get()
+ )).toArray());
+ }
+
+ @Test
+ public void testDeleteSegment() throws IOException, SegmentLoadingException {
+ DruidStorageHandler druidStorageHandler = new DruidStorageHandler(
+ derbyConnectorRule.getConnector(),
+ new SQLMetadataStorageUpdaterJobHandler(derbyConnectorRule.getConnector()),
+ derbyConnectorRule.metadataTablesConfigSupplier().get(),
+ null
+ );
+
+ String segmentRootPath = temporaryFolder.newFolder().getAbsolutePath();
+ Configuration config = new Configuration();
+ druidStorageHandler.setConf(config);
+ LocalFileSystem localFileSystem = FileSystem.getLocal(config);
+
+ Path segmentOutputPath = JobHelper
+ .makeSegmentOutputPath(new Path(segmentRootPath), localFileSystem, dataSegment);
+ Path indexPath = new Path(segmentOutputPath, "index.zip");
+ DataSegment dataSegmentWithLoadspect = DataSegment.builder(dataSegment).loadSpec(
+ ImmutableMap.<String, Object>of("path", indexPath)).build();
+ OutputStream outputStream = localFileSystem.create(indexPath, true);
+ outputStream.close();
+ Assert.assertTrue("index file is not created ??", localFileSystem.exists(indexPath));
+ Assert.assertTrue(localFileSystem.exists(segmentOutputPath));
+
+ druidStorageHandler.deleteSegment(dataSegmentWithLoadspect);
+ // path format -- > .../dataSource/interval/version/partitionNum/xxx.zip
+ Assert.assertFalse("Index file still there ??", localFileSystem.exists(indexPath));
+ // path format of segmentOutputPath -- > .../dataSource/interval/version/partitionNum/
+ Assert.assertFalse("PartitionNum directory still there ??",
+ localFileSystem.exists(segmentOutputPath)
+ );
+ Assert.assertFalse("Version directory still there ??",
+ localFileSystem.exists(segmentOutputPath.getParent())
+ );
+ Assert.assertFalse("Interval directory still there ??",
+ localFileSystem.exists(segmentOutputPath.getParent().getParent())
+ );
+ Assert.assertFalse("Data source directory still there ??",
+ localFileSystem.exists(segmentOutputPath.getParent().getParent().getParent())
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/2aa054a4/druid-handler/src/test/org/apache/hadoop/hive/ql/io/DruidRecordWriterTest.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/DruidRecordWriterTest.java b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/DruidRecordWriterTest.java
deleted file mode 100644
index f72a735..0000000
--- a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/DruidRecordWriterTest.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.io;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.metamx.common.Granularity;
-import io.druid.data.input.Firehose;
-import io.druid.data.input.InputRow;
-import io.druid.data.input.impl.DimensionSchema;
-import io.druid.data.input.impl.DimensionsSpec;
-import io.druid.data.input.impl.InputRowParser;
-import io.druid.data.input.impl.MapInputRowParser;
-import io.druid.data.input.impl.StringDimensionSchema;
-import io.druid.data.input.impl.TimeAndDimsParseSpec;
-import io.druid.data.input.impl.TimestampSpec;
-import io.druid.granularity.QueryGranularities;
-import io.druid.query.aggregation.AggregatorFactory;
-import io.druid.query.aggregation.LongSumAggregatorFactory;
-import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
-import io.druid.segment.QueryableIndex;
-import io.druid.segment.QueryableIndexStorageAdapter;
-import io.druid.segment.indexing.DataSchema;
-import io.druid.segment.indexing.RealtimeTuningConfig;
-import io.druid.segment.indexing.granularity.UniformGranularitySpec;
-import io.druid.segment.loading.DataSegmentPusher;
-import io.druid.segment.loading.LocalDataSegmentPuller;
-import io.druid.segment.loading.LocalDataSegmentPusher;
-import io.druid.segment.loading.LocalDataSegmentPusherConfig;
-import io.druid.segment.loading.SegmentLoadingException;
-import io.druid.segment.realtime.firehose.IngestSegmentFirehose;
-import io.druid.segment.realtime.firehose.WindowedStorageAdapter;
-import io.druid.timeline.DataSegment;
-import org.apache.calcite.adapter.druid.DruidTable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.Constants;
-import org.apache.hadoop.hive.druid.DruidStorageHandler;
-import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
-import org.apache.hadoop.hive.druid.io.DruidRecordWriter;
-import org.apache.hadoop.hive.druid.serde.DruidWritable;
-import org.joda.time.DateTime;
-import org.joda.time.Interval;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import javax.annotation.Nullable;
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-public class DruidRecordWriterTest {
- private ObjectMapper objectMapper = DruidStorageHandlerUtils.JSON_MAPPER;
-
- private static final Interval INTERVAL_FULL = new Interval("2014-10-22T00:00:00Z/P1D");
-
- @Rule
- public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
- private DruidRecordWriter druidRecordWriter;
-
- final List<ImmutableMap<String, Object>> expectedRows = ImmutableList.of(
- ImmutableMap.<String, Object>of(
- DruidTable.DEFAULT_TIMESTAMP_COLUMN,
- DateTime.parse("2014-10-22T00:00:00.000Z").getMillis(),
- "host", ImmutableList.of("a.example.com"),
- "visited_sum", 190L,
- "unique_hosts", 1.0d
- ),
- ImmutableMap.<String, Object>of(
- DruidTable.DEFAULT_TIMESTAMP_COLUMN,
- DateTime.parse("2014-10-22T01:00:00.000Z").getMillis(),
- "host", ImmutableList.of("b.example.com"),
- "visited_sum", 175L,
- "unique_hosts", 1.0d
- ),
- ImmutableMap.<String, Object>of(
- DruidTable.DEFAULT_TIMESTAMP_COLUMN,
- DateTime.parse("2014-10-22T02:00:00.000Z").getMillis(),
- "host", ImmutableList.of("c.example.com"),
- "visited_sum", 270L,
- "unique_hosts", 1.0d
- )
- );
-
- // This test need this patch https://github.com/druid-io/druid/pull/3483
- @Ignore
- @Test
- public void testWrite() throws IOException, SegmentLoadingException {
-
- final String dataSourceName = "testDataSource";
- final File segmentOutputDir = temporaryFolder.newFolder();
- final File workingDir = temporaryFolder.newFolder();
- Configuration config = new Configuration();
-
- final InputRowParser inputRowParser = new MapInputRowParser(new TimeAndDimsParseSpec(
- new TimestampSpec(DruidTable.DEFAULT_TIMESTAMP_COLUMN, "auto", null),
- new DimensionsSpec(ImmutableList.<DimensionSchema>of(new StringDimensionSchema("host")),
- null, null
- )
- ));
- final Map<String, Object> parserMap = objectMapper.convertValue(inputRowParser, Map.class);
-
- DataSchema dataSchema = new DataSchema(
- dataSourceName,
- parserMap,
- new AggregatorFactory[] {
- new LongSumAggregatorFactory("visited_sum", "visited_sum"),
- new HyperUniquesAggregatorFactory("unique_hosts", "unique_hosts")
- },
- new UniformGranularitySpec(
- Granularity.DAY, QueryGranularities.NONE, ImmutableList.of(INTERVAL_FULL)
- ),
- objectMapper
- );
-
- RealtimeTuningConfig tuningConfig = RealtimeTuningConfig
- .makeDefaultTuningConfig(temporaryFolder.newFolder());
- LocalFileSystem localFileSystem = FileSystem.getLocal(config);
- DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(
- new LocalDataSegmentPusherConfig() {
- @Override
- public File getStorageDirectory() {return segmentOutputDir;}
- }, objectMapper);
-
- Path segmentDescriptroPath = new Path(workingDir.getAbsolutePath(),
- DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME
- );
- druidRecordWriter = new DruidRecordWriter(dataSchema, tuningConfig, dataSegmentPusher, 20,
- segmentDescriptroPath, localFileSystem
- );
-
- List<DruidWritable> druidWritables = Lists.transform(expectedRows,
- new Function<ImmutableMap<String, Object>, DruidWritable>() {
- @Nullable
- @Override
- public DruidWritable apply(@Nullable ImmutableMap<String, Object> input
- ) {
- return new DruidWritable(ImmutableMap.<String, Object>builder().putAll(input)
- .put(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME,
- Granularity.DAY.truncate(
- new DateTime((long) input
- .get(DruidTable.DEFAULT_TIMESTAMP_COLUMN)))
- .getMillis()
- ).build());
- }
- }
- );
- for (DruidWritable druidWritable : druidWritables) {
- druidRecordWriter.write(druidWritable);
- }
- druidRecordWriter.close(false);
- List<DataSegment> dataSegmentList = DruidStorageHandlerUtils
- .getPublishedSegments(segmentDescriptroPath, config);
- Assert.assertEquals(1, dataSegmentList.size());
- File tmpUnzippedSegmentDir = temporaryFolder.newFolder();
- new LocalDataSegmentPuller().getSegmentFiles(dataSegmentList.get(0), tmpUnzippedSegmentDir);
- final QueryableIndex queryableIndex = DruidStorageHandlerUtils.INDEX_IO
- .loadIndex(tmpUnzippedSegmentDir);
-
- QueryableIndexStorageAdapter adapter = new QueryableIndexStorageAdapter(queryableIndex);
-
- Firehose firehose = new IngestSegmentFirehose(
- ImmutableList.of(new WindowedStorageAdapter(adapter, adapter.getInterval())),
- ImmutableList.of("host"),
- ImmutableList.of("visited_sum", "unique_hosts"),
- null,
- QueryGranularities.NONE
- );
-
- List<InputRow> rows = Lists.newArrayList();
- while (firehose.hasMore()) {
- rows.add(firehose.nextRow());
- }
-
- verifyRows(expectedRows, rows);
-
- }
-
- private void verifyRows(List<ImmutableMap<String, Object>> expectedRows,
- List<InputRow> actualRows
- ) {
- System.out.println("actualRows = " + actualRows);
- Assert.assertEquals(expectedRows.size(), actualRows.size());
-
- for (int i = 0; i < expectedRows.size(); i++) {
- Map<String, Object> expected = expectedRows.get(i);
- InputRow actual = actualRows.get(i);
-
- Assert.assertEquals(ImmutableList.of("host"), actual.getDimensions());
-
- Assert.assertEquals(expected.get(DruidTable.DEFAULT_TIMESTAMP_COLUMN),
- actual.getTimestamp().getMillis()
- );
- Assert.assertEquals(expected.get("host"), actual.getDimension("host"));
- Assert.assertEquals(expected.get("visited_sum"), actual.getLongMetric("visited_sum"));
- Assert.assertEquals(
- (Double) expected.get("unique_hosts"),
- (Double) HyperUniquesAggregatorFactory
- .estimateCardinality(actual.getRaw("unique_hosts")),
- 0.001
- );
- }
- }
-
- @Test
- public void testSerDesr() throws IOException {
- String segment = "{\"dataSource\":\"datasource2015\",\"interval\":\"2015-06-01T00:00:00.000-04:00/2015-06-02T00:00:00.000-04:00\",\"version\":\"2016-11-04T19:24:01.732-04:00\",\"loadSpec\":{\"type\":\"hdfs\",\"path\":\"hdfs://cn105-10.l42scl.hortonworks.com:8020/apps/hive/warehouse/druid.db/.hive-staging_hive_2016-11-04_19-23-50_168_1550339856804207572-1/_task_tmp.-ext-10002/_tmp.000000_0/datasource2015/20150601T000000.000-0400_20150602T000000.000-0400/2016-11-04T19_24_01.732-04_00/0/index.zip\"},\"dimensions\":\"dimension1\",\"metrics\":\"bigint\",\"shardSpec\":{\"type\":\"linear\",\"partitionNum\":0},\"binaryVersion\":9,\"size\":1765,\"identifier\":\"datasource2015_2015-06-01T00:00:00.000-04:00_2015-06-02T00:00:00.000-04:00_2016-11-04T19:24:01.732-04:00\"}";
- DataSegment dataSegment = objectMapper.reader(DataSegment.class)
- .readValue(segment);
- Assert.assertTrue(dataSegment.getDataSource().equals("datasource2015"));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/2aa054a4/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
new file mode 100644
index 0000000..9ec82c0
--- /dev/null
+++ b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.metamx.common.Granularity;
+import io.druid.data.input.Firehose;
+import io.druid.data.input.InputRow;
+import io.druid.data.input.impl.DimensionSchema;
+import io.druid.data.input.impl.DimensionsSpec;
+import io.druid.data.input.impl.InputRowParser;
+import io.druid.data.input.impl.MapInputRowParser;
+import io.druid.data.input.impl.StringDimensionSchema;
+import io.druid.data.input.impl.TimeAndDimsParseSpec;
+import io.druid.data.input.impl.TimestampSpec;
+import io.druid.granularity.QueryGranularities;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.LongSumAggregatorFactory;
+import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
+import io.druid.segment.QueryableIndex;
+import io.druid.segment.QueryableIndexStorageAdapter;
+import io.druid.segment.indexing.DataSchema;
+import io.druid.segment.indexing.RealtimeTuningConfig;
+import io.druid.segment.indexing.granularity.UniformGranularitySpec;
+import io.druid.segment.loading.DataSegmentPusher;
+import io.druid.segment.loading.LocalDataSegmentPuller;
+import io.druid.segment.loading.LocalDataSegmentPusher;
+import io.druid.segment.loading.LocalDataSegmentPusherConfig;
+import io.druid.segment.loading.SegmentLoadingException;
+import io.druid.segment.realtime.firehose.IngestSegmentFirehose;
+import io.druid.segment.realtime.firehose.WindowedStorageAdapter;
+import io.druid.timeline.DataSegment;
+import org.apache.calcite.adapter.druid.DruidTable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.Constants;
+import org.apache.hadoop.hive.druid.DruidStorageHandler;
+import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
+import org.apache.hadoop.hive.druid.io.DruidRecordWriter;
+import org.apache.hadoop.hive.druid.serde.DruidWritable;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public class TestDruidRecordWriter {
+ private ObjectMapper objectMapper = DruidStorageHandlerUtils.JSON_MAPPER;
+
+ private static final Interval INTERVAL_FULL = new Interval("2014-10-22T00:00:00Z/P1D");
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private DruidRecordWriter druidRecordWriter;
+
+ final List<ImmutableMap<String, Object>> expectedRows = ImmutableList.of(
+ ImmutableMap.<String, Object>of(
+ DruidTable.DEFAULT_TIMESTAMP_COLUMN,
+ DateTime.parse("2014-10-22T00:00:00.000Z").getMillis(),
+ "host", ImmutableList.of("a.example.com"),
+ "visited_sum", 190L,
+ "unique_hosts", 1.0d
+ ),
+ ImmutableMap.<String, Object>of(
+ DruidTable.DEFAULT_TIMESTAMP_COLUMN,
+ DateTime.parse("2014-10-22T01:00:00.000Z").getMillis(),
+ "host", ImmutableList.of("b.example.com"),
+ "visited_sum", 175L,
+ "unique_hosts", 1.0d
+ ),
+ ImmutableMap.<String, Object>of(
+ DruidTable.DEFAULT_TIMESTAMP_COLUMN,
+ DateTime.parse("2014-10-22T02:00:00.000Z").getMillis(),
+ "host", ImmutableList.of("c.example.com"),
+ "visited_sum", 270L,
+ "unique_hosts", 1.0d
+ )
+ );
+
+ // This test need this patch https://github.com/druid-io/druid/pull/3483
+ @Ignore
+ @Test
+ public void testWrite() throws IOException, SegmentLoadingException {
+
+ final String dataSourceName = "testDataSource";
+ final File segmentOutputDir = temporaryFolder.newFolder();
+ final File workingDir = temporaryFolder.newFolder();
+ Configuration config = new Configuration();
+
+ final InputRowParser inputRowParser = new MapInputRowParser(new TimeAndDimsParseSpec(
+ new TimestampSpec(DruidTable.DEFAULT_TIMESTAMP_COLUMN, "auto", null),
+ new DimensionsSpec(ImmutableList.<DimensionSchema>of(new StringDimensionSchema("host")),
+ null, null
+ )
+ ));
+ final Map<String, Object> parserMap = objectMapper.convertValue(inputRowParser, Map.class);
+
+ DataSchema dataSchema = new DataSchema(
+ dataSourceName,
+ parserMap,
+ new AggregatorFactory[] {
+ new LongSumAggregatorFactory("visited_sum", "visited_sum"),
+ new HyperUniquesAggregatorFactory("unique_hosts", "unique_hosts")
+ },
+ new UniformGranularitySpec(
+ Granularity.DAY, QueryGranularities.NONE, ImmutableList.of(INTERVAL_FULL)
+ ),
+ objectMapper
+ );
+
+ RealtimeTuningConfig tuningConfig = RealtimeTuningConfig
+ .makeDefaultTuningConfig(temporaryFolder.newFolder());
+ LocalFileSystem localFileSystem = FileSystem.getLocal(config);
+ DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(
+ new LocalDataSegmentPusherConfig() {
+ @Override
+ public File getStorageDirectory() {return segmentOutputDir;}
+ }, objectMapper);
+
+ Path segmentDescriptroPath = new Path(workingDir.getAbsolutePath(),
+ DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME
+ );
+ druidRecordWriter = new DruidRecordWriter(dataSchema, tuningConfig, dataSegmentPusher, 20,
+ segmentDescriptroPath, localFileSystem
+ );
+
+ List<DruidWritable> druidWritables = Lists.transform(expectedRows,
+ new Function<ImmutableMap<String, Object>, DruidWritable>() {
+ @Nullable
+ @Override
+ public DruidWritable apply(@Nullable ImmutableMap<String, Object> input
+ ) {
+ return new DruidWritable(ImmutableMap.<String, Object>builder().putAll(input)
+ .put(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME,
+ Granularity.DAY.truncate(
+ new DateTime((long) input
+ .get(DruidTable.DEFAULT_TIMESTAMP_COLUMN)))
+ .getMillis()
+ ).build());
+ }
+ }
+ );
+ for (DruidWritable druidWritable : druidWritables) {
+ druidRecordWriter.write(druidWritable);
+ }
+ druidRecordWriter.close(false);
+ List<DataSegment> dataSegmentList = DruidStorageHandlerUtils
+ .getPublishedSegments(segmentDescriptroPath, config);
+ Assert.assertEquals(1, dataSegmentList.size());
+ File tmpUnzippedSegmentDir = temporaryFolder.newFolder();
+ new LocalDataSegmentPuller().getSegmentFiles(dataSegmentList.get(0), tmpUnzippedSegmentDir);
+ final QueryableIndex queryableIndex = DruidStorageHandlerUtils.INDEX_IO
+ .loadIndex(tmpUnzippedSegmentDir);
+
+ QueryableIndexStorageAdapter adapter = new QueryableIndexStorageAdapter(queryableIndex);
+
+ Firehose firehose = new IngestSegmentFirehose(
+ ImmutableList.of(new WindowedStorageAdapter(adapter, adapter.getInterval())),
+ ImmutableList.of("host"),
+ ImmutableList.of("visited_sum", "unique_hosts"),
+ null,
+ QueryGranularities.NONE
+ );
+
+ List<InputRow> rows = Lists.newArrayList();
+ while (firehose.hasMore()) {
+ rows.add(firehose.nextRow());
+ }
+
+ verifyRows(expectedRows, rows);
+
+ }
+
+ private void verifyRows(List<ImmutableMap<String, Object>> expectedRows,
+ List<InputRow> actualRows
+ ) {
+ System.out.println("actualRows = " + actualRows);
+ Assert.assertEquals(expectedRows.size(), actualRows.size());
+
+ for (int i = 0; i < expectedRows.size(); i++) {
+ Map<String, Object> expected = expectedRows.get(i);
+ InputRow actual = actualRows.get(i);
+
+ Assert.assertEquals(ImmutableList.of("host"), actual.getDimensions());
+
+ Assert.assertEquals(expected.get(DruidTable.DEFAULT_TIMESTAMP_COLUMN),
+ actual.getTimestamp().getMillis()
+ );
+ Assert.assertEquals(expected.get("host"), actual.getDimension("host"));
+ Assert.assertEquals(expected.get("visited_sum"), actual.getLongMetric("visited_sum"));
+ Assert.assertEquals(
+ (Double) expected.get("unique_hosts"),
+ (Double) HyperUniquesAggregatorFactory
+ .estimateCardinality(actual.getRaw("unique_hosts")),
+ 0.001
+ );
+ }
+ }
+
+ @Test
+ public void testSerDesr() throws IOException {
+ String segment = "{\"dataSource\":\"datasource2015\",\"interval\":\"2015-06-01T00:00:00.000-04:00/2015-06-02T00:00:00.000-04:00\",\"version\":\"2016-11-04T19:24:01.732-04:00\",\"loadSpec\":{\"type\":\"hdfs\",\"path\":\"hdfs://cn105-10.l42scl.hortonworks.com:8020/apps/hive/warehouse/druid.db/.hive-staging_hive_2016-11-04_19-23-50_168_1550339856804207572-1/_task_tmp.-ext-10002/_tmp.000000_0/datasource2015/20150601T000000.000-0400_20150602T000000.000-0400/2016-11-04T19_24_01.732-04_00/0/index.zip\"},\"dimensions\":\"dimension1\",\"metrics\":\"bigint\",\"shardSpec\":{\"type\":\"linear\",\"partitionNum\":0},\"binaryVersion\":9,\"size\":1765,\"identifier\":\"datasource2015_2015-06-01T00:00:00.000-04:00_2015-06-02T00:00:00.000-04:00_2016-11-04T19:24:01.732-04:00\"}";
+ DataSegment dataSegment = objectMapper.reader(DataSegment.class)
+ .readValue(segment);
+ Assert.assertTrue(dataSegment.getDataSource().equals("datasource2015"));
+ }
+
+}