You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2020/03/26 00:03:54 UTC
[druid] branch 0.18.0 updated: DruidSegmentReader should work if
timestamp is specified as a dimension (#9530) (#9566)
This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch 0.18.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/0.18.0 by this push:
new 17d3022 DruidSegmentReader should work if timestamp is specified as a dimension (#9530) (#9566)
17d3022 is described below
commit 17d30222da268c18df6d5525c64c8d8d7ed4a7b6
Author: Suneet Saldanha <44...@users.noreply.github.com>
AuthorDate: Wed Mar 25 17:03:42 2020 -0700
DruidSegmentReader should work if timestamp is specified as a dimension (#9530) (#9566)
* DruidSegmentReader should work if timestamp is specified as a dimension
* Add integration tests
Tests for compaction and re-indexing a datasource with the timestamp column
* Instructions to run integration tests against quickstart
* address pr
---
.../druid/indexing/input/DruidSegmentReader.java | 28 +++++--
integration-tests/README.md | 37 +++++++++-
integration-tests/quickstart-it.json | 16 ++++
.../druid/tests/indexer/ITCompactionTaskTest.java | 34 ++++++---
.../apache/druid/tests/indexer/ITIndexerTest.java | 53 ++++++++++++-
.../wikipedia_reindex_druid_input_source_task.json | 51 +++++++++++++
.../wikipedia_with_timestamp_index_task.json | 86 ++++++++++++++++++++++
7 files changed, 283 insertions(+), 22 deletions(-)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java
index 5f45eb8..f2ce056 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java
@@ -27,7 +27,6 @@ import org.apache.druid.data.input.InputEntity.CleanableFile;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.IntermediateRowParsingReader;
import org.apache.druid.data.input.MapBasedInputRow;
-import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
@@ -61,6 +60,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.NoSuchElementException;
public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String, Object>>
{
@@ -122,7 +122,7 @@ public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String,
@Override
protected List<InputRow> parseInputRows(Map<String, Object> intermediateRow) throws ParseException
{
- final DateTime timestamp = (DateTime) intermediateRow.get(TimestampSpec.DEFAULT_COLUMN);
+ final DateTime timestamp = (DateTime) intermediateRow.get(ColumnHolder.TIME_COLUMN_NAME);
return Collections.singletonList(new MapBasedInputRow(timestamp.getMillis(), dimensions, intermediateRow));
}
@@ -209,8 +209,9 @@ public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String,
{
this.cursor = cursor;
- timestampColumnSelector =
- cursor.getColumnSelectorFactory().makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME);
+ timestampColumnSelector = cursor
+ .getColumnSelectorFactory()
+ .makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME);
dimSelectors = new HashMap<>();
for (String dim : dimensionNames) {
@@ -225,8 +226,9 @@ public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String,
metSelectors = new HashMap<>();
for (String metric : metricNames) {
- final BaseObjectColumnValueSelector metricSelector =
- cursor.getColumnSelectorFactory().makeColumnValueSelector(metric);
+ final BaseObjectColumnValueSelector metricSelector = cursor
+ .getColumnSelectorFactory()
+ .makeColumnValueSelector(metric);
metSelectors.put(metric, metricSelector);
}
}
@@ -240,9 +242,10 @@ public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String,
@Override
public Map<String, Object> next()
{
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
final Map<String, Object> theEvent = Maps.newLinkedHashMap();
- final long timestamp = timestampColumnSelector.getLong();
- theEvent.put(TimestampSpec.DEFAULT_COLUMN, DateTimes.utc(timestamp));
for (Entry<String, DimensionSelector> dimSelector : dimSelectors.entrySet()) {
final String dim = dimSelector.getKey();
@@ -270,6 +273,15 @@ public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String,
theEvent.put(metric, value);
}
}
+
+ // Timestamp is added last because we expect that the time column will always be a date time object.
+ // If it is added earlier, it can be overwritten by metrics or dimenstions with the same name.
+ //
+ // If a user names a metric or dimension `__time` it will be overwritten. This case should be rare since
+ // __time is reserved for the time column in druid segments.
+ final long timestamp = timestampColumnSelector.getLong();
+ theEvent.put(ColumnHolder.TIME_COLUMN_NAME, DateTimes.utc(timestamp));
+
cursor.advance();
return theEvent;
}
diff --git a/integration-tests/README.md b/integration-tests/README.md
index 0cc5b01..03c6851 100644
--- a/integration-tests/README.md
+++ b/integration-tests/README.md
@@ -66,7 +66,40 @@ Integration tests can also be run with either Java 8 or Java 11 by adding -Djvm.
can either be 8 or 11.
Druid's configuration (using Docker) can be overrided by providing -Doverride.config.path=<PATH_TO_FILE>.
-The file must contain one property per line, the key must start with druid_ and the format should be snake case.
+The file must contain one property per line, the key must start with `druid_` and the format should be snake case.
+
+Running Tests Using A Quickstart Cluster
+-------------------
+
+When writing integration tests, it can be helpful to test against a quickstart
+cluster so that you can set up remote debugging with in your developer
+environment. This section walks you through setting up the integration tests
+so that it can run against a quickstart cluster running on your development
+machine.
+
+Note that not all features run by default on a quickstart cluster, so it may
+not make sense to run the entire test suite against this configuration.
+
+Make sure you have at least 6GB of memory available before you run the tests.
+
+The tests rely on files in the test/resources folder to exist under the path /resources,
+so create a symlink to make them available
+
+```
+ ln -s ${DRUID_HOME}/integration-tests/src/test/resources /resources
+```
+
+Set the cluster config file environment variable to the quickstart config
+```
+ export CONFIG_FILE=${DRUID_HOME}/integration-tests/quickstart-it.json
+```
+Note that quickstart does not run with ssl, so to trick the integration tests
+we specify the `*_tls_url` in the config to be the same as the http url
+
+Then run the tests using a command similar to
+```
+ mvn verify -P int-tests-config-file -Dit.test=<test_name>
+```
Running Tests Using A Configuration File for Any Cluster
-------------------
@@ -90,7 +123,7 @@ To run tests on any druid cluster that is already running, create a configuratio
"cloud_path": "<(optional) cloud_path for test data if running cloud integration test>",
}
-Set the environment variable CONFIG_FILE to the name of the configuration file:
+Set the environment variable `CONFIG_FILE` to the name of the configuration file:
```
export CONFIG_FILE=<config file name>
```
diff --git a/integration-tests/quickstart-it.json b/integration-tests/quickstart-it.json
new file mode 100644
index 0000000..c4b05e4
--- /dev/null
+++ b/integration-tests/quickstart-it.json
@@ -0,0 +1,16 @@
+{
+ "broker_host" : "localhost",
+ "broker_port" : "8082",
+ "broker_tls_url" : "http://localhost:8082",
+ "router_host" : "localhost",
+ "router_port" : "8888",
+ "router_tls_url" : "http://localhost:8888",
+ "indexer_host" : "localhost",
+ "indexer_port" : "8081",
+ "historical_host" : "localhost",
+ "historical_port" : "8083",
+ "coordinator_host" : "localhost",
+ "coordinator_port" : "8081",
+ "middlemanager_host": "localhost",
+ "zookeeper_hosts": "localhost:2181"
+}
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
index 2c0dbc3..ec6ea90 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
@@ -28,13 +28,14 @@ import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.tests.TestNGGroup;
-import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
+import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.List;
@@ -46,33 +47,47 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
private static final String INDEX_TASK = "/indexer/wikipedia_index_task.json";
private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
private static final String INDEX_DATASOURCE = "wikipedia_index_test";
+
private static final String COMPACTION_TASK = "/indexer/wikipedia_compaction_task.json";
+ private static final String INDEX_TASK_WITH_TIMESTAMP = "/indexer/wikipedia_with_timestamp_index_task.json";
+
@Inject
private IntegrationTestingConfig config;
private String fullDatasourceName;
- @BeforeSuite
- public void setFullDatasourceName()
+ @BeforeMethod
+ public void setFullDatasourceName(Method method)
{
- fullDatasourceName = INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix();
+ fullDatasourceName = INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix() + "-" + method.getName();
}
@Test
public void testCompaction() throws Exception
{
- loadData();
+ loadDataAndCompact(INDEX_TASK, INDEX_QUERIES_RESOURCE);
+ }
+
+ @Test
+ public void testCompactionWithTimestampDimension() throws Exception
+ {
+ loadDataAndCompact(INDEX_TASK_WITH_TIMESTAMP, INDEX_QUERIES_RESOURCE);
+ }
+
+ private void loadDataAndCompact(String indexTask, String queriesResource) throws Exception
+ {
+ loadData(indexTask);
final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
intervalsBeforeCompaction.sort(null);
try (final Closeable ignored = unloader(fullDatasourceName)) {
String queryResponseTemplate;
try {
- InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(INDEX_QUERIES_RESOURCE);
+ InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(queriesResource);
queryResponseTemplate = IOUtils.toString(is, StandardCharsets.UTF_8);
}
catch (IOException e) {
- throw new ISE(e, "could not read query file: %s", INDEX_QUERIES_RESOURCE);
+ throw new ISE(e, "could not read query file: %s", queriesResource);
}
queryResponseTemplate = StringUtils.replace(
@@ -92,10 +107,9 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
checkCompactionIntervals(intervalsBeforeCompaction);
}
}
-
- private void loadData() throws Exception
+ private void loadData(String indexTask) throws Exception
{
- String taskSpec = getResourceAsString(INDEX_TASK);
+ String taskSpec = getResourceAsString(indexTask);
taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", fullDatasourceName);
final String taskID = indexer.submitTask(taskSpec);
LOG.info("TaskID for loading index task %s", taskID);
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
index 6ff804e..93ad11a 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
@@ -34,16 +34,26 @@ public class ITIndexerTest extends AbstractITBatchIndexTest
private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
private static final String INDEX_DATASOURCE = "wikipedia_index_test";
+ private static final String INDEX_WITH_TIMESTAMP_TASK = "/indexer/wikipedia_with_timestamp_index_task.json";
+ // TODO: add queries that validate timestamp is different from the __time column since it is a dimension
+ // TODO: https://github.com/apache/druid/issues/9565
+ private static final String INDEX_WITH_TIMESTAMP_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
+ private static final String INDEX_WITH_TIMESTAMP_DATASOURCE = "wikipedia_with_timestamp_index_test";
+
private static final String REINDEX_TASK = "/indexer/wikipedia_reindex_task.json";
+ private static final String REINDEX_TASK_WITH_DRUID_INPUT_SOURCE = "/indexer/wikipedia_reindex_druid_input_source_task.json";
private static final String REINDEX_QUERIES_RESOURCE = "/indexer/wikipedia_reindex_queries.json";
private static final String REINDEX_DATASOURCE = "wikipedia_reindex_test";
@Test
public void testIndexData() throws Exception
{
+ final String reindexDatasource = REINDEX_DATASOURCE + "-testIndexData";
+ final String reindexDatasourceWithDruidInputSource = REINDEX_DATASOURCE + "-testIndexData-druidInputSource";
try (
final Closeable ignored1 = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
- final Closeable ignored2 = unloader(REINDEX_DATASOURCE + config.getExtraDatasourceNameSuffix())
+ final Closeable ignored2 = unloader(reindexDatasource + config.getExtraDatasourceNameSuffix());
+ final Closeable ignored3 = unloader(reindexDatasourceWithDruidInputSource + config.getExtraDatasourceNameSuffix())
) {
doIndexTest(
INDEX_DATASOURCE,
@@ -55,10 +65,49 @@ public class ITIndexerTest extends AbstractITBatchIndexTest
);
doReindexTest(
INDEX_DATASOURCE,
- REINDEX_DATASOURCE,
+ reindexDatasource,
+ REINDEX_TASK,
+ REINDEX_QUERIES_RESOURCE
+ );
+ doReindexTest(
+ INDEX_DATASOURCE,
+ reindexDatasourceWithDruidInputSource,
+ REINDEX_TASK_WITH_DRUID_INPUT_SOURCE,
+ REINDEX_QUERIES_RESOURCE
+ );
+ }
+ }
+
+ @Test
+ public void testReIndexDataWithTimestamp() throws Exception
+ {
+ final String reindexDatasource = REINDEX_DATASOURCE + "-testReIndexDataWithTimestamp";
+ final String reindexDatasourceWithDruidInputSource = REINDEX_DATASOURCE + "-testReIndexDataWithTimestamp-druidInputSource";
+ try (
+ final Closeable ignored1 = unloader(INDEX_WITH_TIMESTAMP_DATASOURCE + config.getExtraDatasourceNameSuffix());
+ final Closeable ignored2 = unloader(reindexDatasource + config.getExtraDatasourceNameSuffix());
+ final Closeable ignored3 = unloader(reindexDatasourceWithDruidInputSource + config.getExtraDatasourceNameSuffix())
+ ) {
+ doIndexTest(
+ INDEX_WITH_TIMESTAMP_DATASOURCE,
+ INDEX_WITH_TIMESTAMP_TASK,
+ INDEX_WITH_TIMESTAMP_QUERIES_RESOURCE,
+ false,
+ true,
+ true
+ );
+ doReindexTest(
+ INDEX_WITH_TIMESTAMP_DATASOURCE,
+ reindexDatasource,
REINDEX_TASK,
REINDEX_QUERIES_RESOURCE
);
+ doReindexTest(
+ INDEX_WITH_TIMESTAMP_DATASOURCE,
+ reindexDatasourceWithDruidInputSource,
+ REINDEX_TASK_WITH_DRUID_INPUT_SOURCE,
+ REINDEX_QUERIES_RESOURCE
+ );
}
}
}
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_reindex_druid_input_source_task.json b/integration-tests/src/test/resources/indexer/wikipedia_reindex_druid_input_source_task.json
new file mode 100644
index 0000000..3a5934c
--- /dev/null
+++ b/integration-tests/src/test/resources/indexer/wikipedia_reindex_druid_input_source_task.json
@@ -0,0 +1,51 @@
+{
+ "type": "index",
+ "spec": {
+ "ioConfig": {
+ "type": "index",
+ "inputSource": {
+ "type": "druid",
+ "dataSource": "%%DATASOURCE%%",
+ "interval": "2013-08-31/2013-09-01"
+ }
+ },
+ "tuningConfig": {
+ "type": "index",
+ "partitionsSpec": {
+ "type": "dynamic"
+ }
+ },
+ "dataSchema": {
+ "dataSource": "%%REINDEX_DATASOURCE%%",
+ "granularitySpec": {
+ "type": "uniform",
+ "queryGranularity": "SECOND",
+ "segmentGranularity": "DAY"
+ },
+ "timestampSpec": {
+ "column": "__time",
+ "format": "iso"
+ },
+ "dimensionsSpec": {
+ "dimensionExclusions" : ["robot", "continent"]
+ },
+ "metricsSpec": [
+ {
+ "type": "doubleSum",
+ "name": "added",
+ "fieldName": "added"
+ },
+ {
+ "type": "doubleSum",
+ "name": "deleted",
+ "fieldName": "deleted"
+ },
+ {
+ "type": "doubleSum",
+ "name": "delta",
+ "fieldName": "delta"
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_with_timestamp_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_with_timestamp_index_task.json
new file mode 100644
index 0000000..0d66383
--- /dev/null
+++ b/integration-tests/src/test/resources/indexer/wikipedia_with_timestamp_index_task.json
@@ -0,0 +1,86 @@
+{
+ "type": "index",
+ "spec": {
+ "dataSchema": {
+ "dataSource": "%%DATASOURCE%%",
+ "metricsSpec": [
+ {
+ "type": "count",
+ "name": "count"
+ },
+ {
+ "type": "doubleSum",
+ "name": "added",
+ "fieldName": "added"
+ },
+ {
+ "type": "doubleSum",
+ "name": "deleted",
+ "fieldName": "deleted"
+ },
+ {
+ "type": "doubleSum",
+ "name": "delta",
+ "fieldName": "delta"
+ },
+ {
+ "name": "thetaSketch",
+ "type": "thetaSketch",
+ "fieldName": "user"
+ },
+ {
+ "name": "quantilesDoublesSketch",
+ "type": "quantilesDoublesSketch",
+ "fieldName": "delta"
+ },
+ {
+ "name": "HLLSketchBuild",
+ "type": "HLLSketchBuild",
+ "fieldName": "user"
+ }
+ ],
+ "granularitySpec": {
+ "segmentGranularity": "DAY",
+ "queryGranularity": "second",
+ "intervals" : [ "2013-08-31/2013-09-02" ]
+ },
+ "parser": {
+ "parseSpec": {
+ "format" : "json",
+ "timestampSpec": {
+ "column": "timestamp"
+ },
+ "dimensionsSpec": {
+ "dimensions": [
+ "page",
+ {"type": "string", "name": "language", "createBitmapIndex": false},
+ "user",
+ "unpatrolled",
+ "newPage",
+ "robot",
+ "anonymous",
+ "namespace",
+ "continent",
+ "country",
+ "region",
+ "city",
+ "timestamp"
+ ]
+ }
+ }
+ }
+ },
+ "ioConfig": {
+ "type": "index",
+ "firehose": {
+ "type": "local",
+ "baseDir": "/resources/data/batch_index",
+ "filter": "wikipedia_index_data*"
+ }
+ },
+ "tuningConfig": {
+ "type": "index",
+ "maxRowsPerSegment": 3
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org