You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2020/04/24 19:43:19 UTC
[druid] branch 0.18.1 updated: Initialize SettableByteEntityReader
only when inputFormat is not null (#9734) (#9767)
This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch 0.18.1
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/0.18.1 by this push:
new ee2972a Initialize SettableByteEntityReader only when inputFormat is not null (#9734) (#9767)
ee2972a is described below
commit ee2972a470803c029d3fbf20122e01018fc3d53b
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Fri Apr 24 12:43:05 2020 -0700
Initialize SettableByteEntityReader only when inputFormat is not null (#9734) (#9767)
* Lazy initialization of SettableByteEntityReader to avoid NPE
* toInputFormat for tsv
* address comments
* common code
---
.../druid/data/input/impl/DelimitedParseSpec.java | 7 +
.../SeekableStreamIndexTaskIOConfig.java | 1 +
.../SeekableStreamIndexTaskRunner.java | 11 +-
.../indexing/seekablestream/StreamChunkParser.java | 37 ++++-
.../seekablestream/StreamChunkParserTest.java | 176 +++++++++++++++++++++
5 files changed, 221 insertions(+), 11 deletions(-)
diff --git a/core/src/main/java/org/apache/druid/data/input/impl/DelimitedParseSpec.java b/core/src/main/java/org/apache/druid/data/input/impl/DelimitedParseSpec.java
index 5940e70..3ee0f71 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/DelimitedParseSpec.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/DelimitedParseSpec.java
@@ -22,6 +22,7 @@ package org.apache.druid.data.input.impl;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.InputFormat;
import org.apache.druid.java.util.common.parsers.DelimitedParser;
import org.apache.druid.java.util.common.parsers.Parser;
@@ -124,6 +125,12 @@ public class DelimitedParseSpec extends ParseSpec
}
@Override
+ public InputFormat toInputFormat()
+ {
+ return new DelimitedInputFormat(columns, listDelimiter, delimiter, hasHeaderRow, null, skipHeaderRows);
+ }
+
+ @Override
public ParseSpec withTimestampSpec(TimestampSpec spec)
{
return new DelimitedParseSpec(
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java
index 9ac35a9..ae2cd88 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java
@@ -126,6 +126,7 @@ public abstract class SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceO
return inputFormat;
}
+ @Nullable
public InputFormat getInputFormat(ParseSpec parseSpec)
{
return inputFormat == null ? Preconditions.checkNotNull(parseSpec, "parseSpec").toInputFormat() : inputFormat;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 1577833..bcdcecc 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -202,6 +202,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
private final SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> ioConfig;
private final SeekableStreamIndexTaskTuningConfig tuningConfig;
private final InputRowSchema inputRowSchema;
+ @Nullable
private final InputFormat inputFormat;
@Nullable
private final InputRowParser<ByteBuffer> parser;
@@ -372,12 +373,10 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
// Now we can initialize StreamChunkReader with the given toolbox.
final StreamChunkParser parser = new StreamChunkParser(
this.parser,
- new SettableByteEntityReader(
- inputFormat,
- inputRowSchema,
- task.getDataSchema().getTransformSpec(),
- toolbox.getIndexingTmpDir()
- )
+ inputFormat,
+ inputRowSchema,
+ task.getDataSchema().getTransformSpec(),
+ toolbox.getIndexingTmpDir()
);
initializeSequences();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java
index 3f9b281..6061aff 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java
@@ -19,12 +19,17 @@
package org.apache.druid.indexing.seekablestream;
+import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.InputRowParser;
+import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.segment.transform.TransformSpec;
import javax.annotation.Nullable;
+import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -38,20 +43,42 @@ class StreamChunkParser
{
@Nullable
private final InputRowParser<ByteBuffer> parser;
+ @Nullable
private final SettableByteEntityReader byteEntityReader;
- StreamChunkParser(@Nullable InputRowParser<ByteBuffer> parser, SettableByteEntityReader byteEntityReader)
+ /**
+ * Either parser or inputFormat shouldn't be null.
+ */
+ StreamChunkParser(
+ @Nullable InputRowParser<ByteBuffer> parser,
+ @Nullable InputFormat inputFormat,
+ InputRowSchema inputRowSchema,
+ TransformSpec transformSpec,
+ File indexingTmpDir
+ )
{
+ if (parser == null && inputFormat == null) {
+ throw new IAE("Either parser or inputFormat should be set");
+ }
this.parser = parser;
- this.byteEntityReader = byteEntityReader;
+ if (inputFormat != null) {
+ this.byteEntityReader = new SettableByteEntityReader(
+ inputFormat,
+ inputRowSchema,
+ transformSpec,
+ indexingTmpDir
+ );
+ } else {
+ this.byteEntityReader = null;
+ }
}
List<InputRow> parse(List<byte[]> streamChunk) throws IOException
{
- if (parser != null) {
- return parseWithParser(parser, streamChunk);
- } else {
+ if (byteEntityReader != null) {
return parseWithInputFormat(byteEntityReader, streamChunk);
+ } else {
+ return parseWithParser(parser, streamChunk);
}
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java
new file mode 100644
index 0000000..76a02db
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.google.common.collect.Iterables;
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.InputRowParser;
+import org.apache.druid.data.input.impl.JSONParseSpec;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.segment.transform.TransformSpec;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class StreamChunkParserTest
+{
+ private static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec(null, null, null);
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void testWithParserAndNullInputformatParseProperly() throws IOException
+ {
+ final InputRowParser<ByteBuffer> parser = new StringInputRowParser(
+ new NotConvertibleToInputFormatParseSpec(),
+ StringUtils.UTF8_STRING
+ );
+ final StreamChunkParser chunkParser = new StreamChunkParser(
+ parser,
+ // Set nulls for all parameters below since inputFormat will be never used.
+ null,
+ null,
+ null,
+ null
+ );
+ parseAndAssertResult(chunkParser);
+ }
+
+ @Test
+ public void testWithNullParserAndInputformatParseProperly() throws IOException
+ {
+ final JsonInputFormat inputFormat = new JsonInputFormat(JSONPathSpec.DEFAULT, Collections.emptyMap());
+ final StreamChunkParser chunkParser = new StreamChunkParser(
+ null,
+ inputFormat,
+ new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, Collections.emptyList()),
+ TransformSpec.NONE,
+ temporaryFolder.newFolder()
+ );
+ parseAndAssertResult(chunkParser);
+ }
+
+ @Test
+ public void testWithNullParserAndNullInputformatFailToCreateParser()
+ {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("Either parser or inputFormat should be set");
+ final StreamChunkParser chunkParser = new StreamChunkParser(
+ null,
+ null,
+ null,
+ null,
+ null
+ );
+ }
+
+ @Test
+ public void testBothParserAndInputFormatParseProperlyUsingInputFormat() throws IOException
+ {
+ final InputRowParser<ByteBuffer> parser = new StringInputRowParser(
+ new NotConvertibleToInputFormatParseSpec(),
+ StringUtils.UTF8_STRING
+ );
+ final TrackingJsonInputFormat inputFormat = new TrackingJsonInputFormat(
+ JSONPathSpec.DEFAULT,
+ Collections.emptyMap()
+ );
+ final StreamChunkParser chunkParser = new StreamChunkParser(
+ parser,
+ inputFormat,
+ new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, Collections.emptyList()),
+ TransformSpec.NONE,
+ temporaryFolder.newFolder()
+ );
+ parseAndAssertResult(chunkParser);
+ Assert.assertTrue(inputFormat.used);
+ }
+
+ private void parseAndAssertResult(StreamChunkParser chunkParser) throws IOException
+ {
+ final String json = "{\"timestamp\": \"2020-01-01\", \"dim\": \"val\", \"met\": \"val2\"}";
+ List<InputRow> parsedRows = chunkParser.parse(Collections.singletonList(json.getBytes(StringUtils.UTF8_STRING)));
+ Assert.assertEquals(1, parsedRows.size());
+ InputRow row = parsedRows.get(0);
+ Assert.assertEquals(DateTimes.of("2020-01-01"), row.getTimestamp());
+ Assert.assertEquals("val", Iterables.getOnlyElement(row.getDimension("dim")));
+ Assert.assertEquals("val2", Iterables.getOnlyElement(row.getDimension("met")));
+ }
+
+ private static class NotConvertibleToInputFormatParseSpec extends JSONParseSpec
+ {
+ private NotConvertibleToInputFormatParseSpec()
+ {
+ super(
+ TIMESTAMP_SPEC,
+ DimensionsSpec.EMPTY,
+ JSONPathSpec.DEFAULT,
+ Collections.emptyMap()
+ );
+ }
+
+ @Override
+ public InputFormat toInputFormat()
+ {
+ return null;
+ }
+ }
+
+ private static class TrackingJsonInputFormat extends JsonInputFormat
+ {
+ private boolean used;
+
+ private TrackingJsonInputFormat(@Nullable JSONPathSpec flattenSpec, @Nullable Map<String, Boolean> featureSpec)
+ {
+ super(flattenSpec, featureSpec);
+ }
+
+ @Override
+ public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
+ {
+ used = true;
+ return super.createReader(inputRowSchema, source, temporaryDirectory);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org