You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2020/04/24 19:43:19 UTC

[druid] branch 0.18.1 updated: Initialize SettableByteEntityReader only when inputFormat is not null (#9734) (#9767)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch 0.18.1
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/0.18.1 by this push:
     new ee2972a  Initialize SettableByteEntityReader only when inputFormat is not null (#9734) (#9767)
ee2972a is described below

commit ee2972a470803c029d3fbf20122e01018fc3d53b
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Fri Apr 24 12:43:05 2020 -0700

    Initialize SettableByteEntityReader only when inputFormat is not null (#9734) (#9767)
    
    * Lazy initialization of SettableByteEntityReader to avoid NPE
    
    * toInputFormat for tsv
    
    * address comments
    
    * common code
---
 .../druid/data/input/impl/DelimitedParseSpec.java  |   7 +
 .../SeekableStreamIndexTaskIOConfig.java           |   1 +
 .../SeekableStreamIndexTaskRunner.java             |  11 +-
 .../indexing/seekablestream/StreamChunkParser.java |  37 ++++-
 .../seekablestream/StreamChunkParserTest.java      | 176 +++++++++++++++++++++
 5 files changed, 221 insertions(+), 11 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/data/input/impl/DelimitedParseSpec.java b/core/src/main/java/org/apache/druid/data/input/impl/DelimitedParseSpec.java
index 5940e70..3ee0f71 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/DelimitedParseSpec.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/DelimitedParseSpec.java
@@ -22,6 +22,7 @@ package org.apache.druid.data.input.impl;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.java.util.common.parsers.DelimitedParser;
 import org.apache.druid.java.util.common.parsers.Parser;
 
@@ -124,6 +125,12 @@ public class DelimitedParseSpec extends ParseSpec
   }
 
   @Override
+  public InputFormat toInputFormat()
+  {
+    return new DelimitedInputFormat(columns, listDelimiter, delimiter, hasHeaderRow, null, skipHeaderRows);
+  }
+
+  @Override
   public ParseSpec withTimestampSpec(TimestampSpec spec)
   {
     return new DelimitedParseSpec(
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java
index 9ac35a9..ae2cd88 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java
@@ -126,6 +126,7 @@ public abstract class SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceO
     return inputFormat;
   }
 
+  @Nullable
   public InputFormat getInputFormat(ParseSpec parseSpec)
   {
     return inputFormat == null ? Preconditions.checkNotNull(parseSpec, "parseSpec").toInputFormat() : inputFormat;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 1577833..bcdcecc 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -202,6 +202,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   private final SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> ioConfig;
   private final SeekableStreamIndexTaskTuningConfig tuningConfig;
   private final InputRowSchema inputRowSchema;
+  @Nullable
   private final InputFormat inputFormat;
   @Nullable
   private final InputRowParser<ByteBuffer> parser;
@@ -372,12 +373,10 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     // Now we can initialize StreamChunkReader with the given toolbox.
     final StreamChunkParser parser = new StreamChunkParser(
         this.parser,
-        new SettableByteEntityReader(
-            inputFormat,
-            inputRowSchema,
-            task.getDataSchema().getTransformSpec(),
-            toolbox.getIndexingTmpDir()
-        )
+        inputFormat,
+        inputRowSchema,
+        task.getDataSchema().getTransformSpec(),
+        toolbox.getIndexingTmpDir()
     );
 
     initializeSequences();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java
index 3f9b281..6061aff 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java
@@ -19,12 +19,17 @@
 
 package org.apache.druid.indexing.seekablestream;
 
+import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
 import org.apache.druid.data.input.impl.ByteEntity;
 import org.apache.druid.data.input.impl.InputRowParser;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.segment.transform.TransformSpec;
 
 import javax.annotation.Nullable;
+import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -38,20 +43,42 @@ class StreamChunkParser
 {
   @Nullable
   private final InputRowParser<ByteBuffer> parser;
+  @Nullable
   private final SettableByteEntityReader byteEntityReader;
 
-  StreamChunkParser(@Nullable InputRowParser<ByteBuffer> parser, SettableByteEntityReader byteEntityReader)
+  /**
+   * Either parser or inputFormat shouldn't be null.
+   */
+  StreamChunkParser(
+      @Nullable InputRowParser<ByteBuffer> parser,
+      @Nullable InputFormat inputFormat,
+      InputRowSchema inputRowSchema,
+      TransformSpec transformSpec,
+      File indexingTmpDir
+  )
   {
+    if (parser == null && inputFormat == null) {
+      throw new IAE("Either parser or inputFormat should be set");
+    }
     this.parser = parser;
-    this.byteEntityReader = byteEntityReader;
+    if (inputFormat != null) {
+      this.byteEntityReader = new SettableByteEntityReader(
+          inputFormat,
+          inputRowSchema,
+          transformSpec,
+          indexingTmpDir
+      );
+    } else {
+      this.byteEntityReader = null;
+    }
   }
 
   List<InputRow> parse(List<byte[]> streamChunk) throws IOException
   {
-    if (parser != null) {
-      return parseWithParser(parser, streamChunk);
-    } else {
+    if (byteEntityReader != null) {
       return parseWithInputFormat(byteEntityReader, streamChunk);
+    } else {
+      return parseWithParser(parser, streamChunk);
     }
   }
 
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java
new file mode 100644
index 0000000..76a02db
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.google.common.collect.Iterables;
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.InputRowParser;
+import org.apache.druid.data.input.impl.JSONParseSpec;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.segment.transform.TransformSpec;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class StreamChunkParserTest
+{
+  private static final TimestampSpec TIMESTAMP_SPEC = new TimestampSpec(null, null, null);
+
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  @Test
+  public void testWithParserAndNullInputformatParseProperly() throws IOException
+  {
+    final InputRowParser<ByteBuffer> parser = new StringInputRowParser(
+        new NotConvertibleToInputFormatParseSpec(),
+        StringUtils.UTF8_STRING
+    );
+    final StreamChunkParser chunkParser = new StreamChunkParser(
+        parser,
+        // Set nulls for all parameters below since inputFormat will be never used.
+        null,
+        null,
+        null,
+        null
+    );
+    parseAndAssertResult(chunkParser);
+  }
+
+  @Test
+  public void testWithNullParserAndInputformatParseProperly() throws IOException
+  {
+    final JsonInputFormat inputFormat = new JsonInputFormat(JSONPathSpec.DEFAULT, Collections.emptyMap());
+    final StreamChunkParser chunkParser = new StreamChunkParser(
+        null,
+        inputFormat,
+        new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, Collections.emptyList()),
+        TransformSpec.NONE,
+        temporaryFolder.newFolder()
+    );
+    parseAndAssertResult(chunkParser);
+  }
+
+  @Test
+  public void testWithNullParserAndNullInputformatFailToCreateParser()
+  {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("Either parser or inputFormat should be set");
+    final StreamChunkParser chunkParser = new StreamChunkParser(
+        null,
+        null,
+        null,
+        null,
+        null
+    );
+  }
+
+  @Test
+  public void testBothParserAndInputFormatParseProperlyUsingInputFormat() throws IOException
+  {
+    final InputRowParser<ByteBuffer> parser = new StringInputRowParser(
+        new NotConvertibleToInputFormatParseSpec(),
+        StringUtils.UTF8_STRING
+    );
+    final TrackingJsonInputFormat inputFormat = new TrackingJsonInputFormat(
+        JSONPathSpec.DEFAULT,
+        Collections.emptyMap()
+    );
+    final StreamChunkParser chunkParser = new StreamChunkParser(
+        parser,
+        inputFormat,
+        new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, Collections.emptyList()),
+        TransformSpec.NONE,
+        temporaryFolder.newFolder()
+    );
+    parseAndAssertResult(chunkParser);
+    Assert.assertTrue(inputFormat.used);
+  }
+
+  private void parseAndAssertResult(StreamChunkParser chunkParser) throws IOException
+  {
+    final String json = "{\"timestamp\": \"2020-01-01\", \"dim\": \"val\", \"met\": \"val2\"}";
+    List<InputRow> parsedRows = chunkParser.parse(Collections.singletonList(json.getBytes(StringUtils.UTF8_STRING)));
+    Assert.assertEquals(1, parsedRows.size());
+    InputRow row = parsedRows.get(0);
+    Assert.assertEquals(DateTimes.of("2020-01-01"), row.getTimestamp());
+    Assert.assertEquals("val", Iterables.getOnlyElement(row.getDimension("dim")));
+    Assert.assertEquals("val2", Iterables.getOnlyElement(row.getDimension("met")));
+  }
+
+  private static class NotConvertibleToInputFormatParseSpec extends JSONParseSpec
+  {
+    private NotConvertibleToInputFormatParseSpec()
+    {
+      super(
+          TIMESTAMP_SPEC,
+          DimensionsSpec.EMPTY,
+          JSONPathSpec.DEFAULT,
+          Collections.emptyMap()
+      );
+    }
+
+    @Override
+    public InputFormat toInputFormat()
+    {
+      return null;
+    }
+  }
+
+  private static class TrackingJsonInputFormat extends JsonInputFormat
+  {
+    private boolean used;
+
+    private TrackingJsonInputFormat(@Nullable JSONPathSpec flattenSpec, @Nullable Map<String, Boolean> featureSpec)
+    {
+      super(flattenSpec, featureSpec);
+    }
+
+    @Override
+    public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
+    {
+      used = true;
+      return super.createReader(inputRowSchema, source, temporaryDirectory);
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org