You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2020/04/07 21:19:41 UTC

[druid] branch 0.18.0 updated: Reuse transformer in stream indexing (#9625) (#9639)

This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch 0.18.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/0.18.0 by this push:
     new 4df4eac  Reuse transformer in stream indexing (#9625) (#9639)
4df4eac is described below

commit 4df4eacf231501175b61cb830038a1d6ee769601
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Tue Apr 7 14:19:30 2020 -0700

    Reuse transformer in stream indexing (#9625) (#9639)
    
    * Reuse transformer in stream indexing
    
    * remove unused method
    
    * memoize complied pattern
---
 .../druid/data/input/impl/RegexInputFormat.java    | 29 +++++++-
 .../apache/druid/data/input/impl/RegexReader.java  |  7 +-
 .../data/input/impl/RegexInputFormatTest.java      | 72 +++++++++++++++++++
 .../SeekableStreamIndexTaskRunner.java             | 54 ++++----------
 .../seekablestream/SettableByteEntityReader.java   | 84 ++++++++++++++++++++++
 .../indexing/seekablestream/StreamChunkParser.java | 81 +++++++++++++++++++++
 .../druid/segment/transform/TransformSpec.java     |  6 --
 7 files changed, 283 insertions(+), 50 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/data/input/impl/RegexInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/RegexInputFormat.java
index 1165516..c3745bc 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/RegexInputFormat.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/RegexInputFormat.java
@@ -20,7 +20,10 @@
 package org.apache.druid.data.input.impl;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
 import org.apache.druid.data.input.InputEntity;
 import org.apache.druid.data.input.InputEntityReader;
 import org.apache.druid.data.input.InputFormat;
@@ -29,12 +32,15 @@ import org.apache.druid.data.input.InputRowSchema;
 import javax.annotation.Nullable;
 import java.io.File;
 import java.util.List;
+import java.util.regex.Pattern;
 
 public class RegexInputFormat implements InputFormat
 {
   private final String pattern;
   private final String listDelimiter;
   private final List<String> columns;
+  @JsonIgnore
+  private final Supplier<Pattern> compiledPatternSupplier;
 
   @JsonCreator
   public RegexInputFormat(
@@ -46,6 +52,27 @@ public class RegexInputFormat implements InputFormat
     this.pattern = pattern;
     this.listDelimiter = listDelimiter;
     this.columns = columns;
+    this.compiledPatternSupplier = Suppliers.memoize(() -> Pattern.compile(pattern));
+  }
+
+  @JsonProperty
+  public String getPattern()
+  {
+    return pattern;
+  }
+
+  @Nullable
+  @JsonProperty
+  public String getListDelimiter()
+  {
+    return listDelimiter;
+  }
+
+  @Nullable
+  @JsonProperty
+  public List<String> getColumns()
+  {
+    return columns;
   }
 
   @Override
@@ -57,6 +84,6 @@ public class RegexInputFormat implements InputFormat
   @Override
   public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
   {
-    return new RegexReader(inputRowSchema, source, pattern, listDelimiter, columns);
+    return new RegexReader(inputRowSchema, source, pattern, compiledPatternSupplier.get(), listDelimiter, columns);
   }
 }
diff --git a/core/src/main/java/org/apache/druid/data/input/impl/RegexReader.java b/core/src/main/java/org/apache/druid/data/input/impl/RegexReader.java
index 2962ebd..e29e1df 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/RegexReader.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/RegexReader.java
@@ -42,7 +42,7 @@ import java.util.regex.Pattern;
 public class RegexReader extends TextReader
 {
   private final String pattern;
-  private final Pattern compiled;
+  private final Pattern compiledPattern;
   private final Function<String, Object> multiValueFunction;
 
   private List<String> columns;
@@ -51,13 +51,14 @@ public class RegexReader extends TextReader
       InputRowSchema inputRowSchema,
       InputEntity source,
       String pattern,
+      Pattern compiledPattern,
       @Nullable String listDelimiter,
       @Nullable List<String> columns
   )
   {
     super(inputRowSchema, source);
     this.pattern = pattern;
-    this.compiled = Pattern.compile(pattern);
+    this.compiledPattern = compiledPattern;
     final String finalListDelimeter = listDelimiter == null ? Parsers.DEFAULT_LIST_DELIMITER : listDelimiter;
     this.multiValueFunction = ParserUtils.getMultiValueFunction(finalListDelimeter, Splitter.on(finalListDelimeter));
     this.columns = columns;
@@ -78,7 +79,7 @@ public class RegexReader extends TextReader
   private Map<String, Object> parseLine(String line)
   {
     try {
-      final Matcher matcher = compiled.matcher(line);
+      final Matcher matcher = compiledPattern.matcher(line);
 
       if (!matcher.matches()) {
         throw new ParseException("Incorrect Regex: %s . No match found.", pattern);
diff --git a/core/src/test/java/org/apache/druid/data/input/impl/RegexInputFormatTest.java b/core/src/test/java/org/apache/druid/data/input/impl/RegexInputFormatTest.java
new file mode 100644
index 0000000..9754a75
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/data/input/impl/RegexInputFormatTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.InputFormat;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class RegexInputFormatTest
+{
+  private final ObjectMapper mapper;
+
+  public RegexInputFormatTest()
+  {
+    mapper = new ObjectMapper();
+    mapper.registerSubtypes(new NamedType(RegexInputFormat.class, "regex"));
+  }
+
+  @Test
+  public void testSerde() throws IOException
+  {
+    final RegexInputFormat expected = new RegexInputFormat(
+        "//[^\\r\\n]*[\\r\\n]",
+        "|",
+        ImmutableList.of("col1", "col2", "col3")
+    );
+
+    final byte[] json = mapper.writeValueAsBytes(expected);
+    final RegexInputFormat fromJson = (RegexInputFormat) mapper.readValue(json, InputFormat.class);
+
+    Assert.assertEquals(expected.getPattern(), fromJson.getPattern());
+    Assert.assertEquals(expected.getListDelimiter(), fromJson.getListDelimiter());
+    Assert.assertEquals(expected.getColumns(), fromJson.getColumns());
+  }
+
+  @Test
+  public void testIgnoreCompiledPatternInJson() throws IOException
+  {
+    final RegexInputFormat expected = new RegexInputFormat(
+        "//[^\\r\\n]*[\\r\\n]",
+        "|",
+        ImmutableList.of("col1", "col2", "col3")
+    );
+
+    final byte[] json = mapper.writeValueAsBytes(expected);
+    final Map<String, Object> map = mapper.readValue(json, Map.class);
+    Assert.assertFalse(map.containsKey("compiledPattern"));
+  }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 9a18322..1577833 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -38,11 +38,9 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import org.apache.druid.data.input.Committer;
-import org.apache.druid.data.input.InputEntityReader;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.InputRowSchema;
-import org.apache.druid.data.input.impl.ByteEntity;
 import org.apache.druid.data.input.impl.InputRowParser;
 import org.apache.druid.discovery.DiscoveryDruidNode;
 import org.apache.druid.discovery.LookupNodeService;
@@ -74,7 +72,6 @@ import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.collect.Utils;
-import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.java.util.common.parsers.ParseException;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.query.aggregation.AggregatorFactory;
@@ -206,6 +203,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   private final SeekableStreamIndexTaskTuningConfig tuningConfig;
   private final InputRowSchema inputRowSchema;
   private final InputFormat inputFormat;
+  @Nullable
   private final InputRowParser<ByteBuffer> parser;
   private final AuthorizerMapper authorizerMapper;
   private final Optional<ChatHandlerProvider> chatHandlerProvider;
@@ -364,48 +362,24 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     log.info("Starting with sequences: %s", sequences);
   }
 
-  private List<InputRow> parseBytes(List<byte[]> valueBytess) throws IOException
-  {
-    if (parser != null) {
-      return parseWithParser(valueBytess);
-    } else {
-      return parseWithInputFormat(valueBytess);
-    }
-  }
-
-  private List<InputRow> parseWithParser(List<byte[]> valueBytess)
-  {
-    final List<InputRow> rows = new ArrayList<>();
-    for (byte[] valueBytes : valueBytess) {
-      rows.addAll(parser.parseBatch(ByteBuffer.wrap(valueBytes)));
-    }
-    return rows;
-  }
-
-  private List<InputRow> parseWithInputFormat(List<byte[]> valueBytess) throws IOException
-  {
-    final List<InputRow> rows = new ArrayList<>();
-    for (byte[] valueBytes : valueBytess) {
-      final InputEntityReader reader = task.getDataSchema().getTransformSpec().decorate(
-          Preconditions.checkNotNull(inputFormat, "inputFormat").createReader(
-              inputRowSchema,
-              new ByteEntity(valueBytes),
-              toolbox.getIndexingTmpDir()
-          )
-      );
-      try (CloseableIterator<InputRow> rowIterator = reader.read()) {
-        rowIterator.forEachRemaining(rows::add);
-      }
-    }
-    return rows;
-  }
-
   private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
   {
     startTime = DateTimes.nowUtc();
     status = Status.STARTING;
 
     setToolbox(toolbox);
+
+    // Now we can initialize StreamChunkReader with the given toolbox.
+    final StreamChunkParser parser = new StreamChunkParser(
+        this.parser,
+        new SettableByteEntityReader(
+            inputFormat,
+            inputRowSchema,
+            task.getDataSchema().getTransformSpec(),
+            toolbox.getIndexingTmpDir()
+        )
+    );
+
     initializeSequences();
 
     if (chatHandlerProvider.isPresent()) {
@@ -657,7 +631,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
                 if (valueBytess == null || valueBytess.isEmpty()) {
                   rows = Utils.nullableListOf((InputRow) null);
                 } else {
-                  rows = parseBytes(valueBytess);
+                  rows = parser.parse(valueBytess);
                 }
                 boolean isPersistRequired = false;
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntityReader.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntityReader.java
new file mode 100644
index 0000000..e34ee67
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntityReader.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowListPlusRawValues;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.segment.transform.TransformSpec;
+import org.apache.druid.segment.transform.Transformer;
+import org.apache.druid.segment.transform.TransformingInputEntityReader;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * A settable {@link InputEntityReader}. This class is intended to be used for only stream parsing in Kafka or Kinesis
+ * indexing.
+ */
+class SettableByteEntityReader implements InputEntityReader
+{
+  private final InputFormat inputFormat;
+  private final InputRowSchema inputRowSchema;
+  private final Transformer transformer;
+  private final File indexingTmpDir;
+
+  private InputEntityReader delegate;
+
+  SettableByteEntityReader(
+      InputFormat inputFormat,
+      InputRowSchema inputRowSchema,
+      TransformSpec transformSpec,
+      File indexingTmpDir
+  )
+  {
+    this.inputFormat = Preconditions.checkNotNull(inputFormat, "inputFormat");
+    this.inputRowSchema = inputRowSchema;
+    this.transformer = transformSpec.toTransformer();
+    this.indexingTmpDir = indexingTmpDir;
+  }
+
+  void setEntity(ByteEntity entity) throws IOException
+  {
+    this.delegate = new TransformingInputEntityReader(
+        // Yes, we are creating a new reader for every stream chunk.
+        // This should be fine as long as initializing a reader is cheap which it is for now.
+        inputFormat.createReader(inputRowSchema, entity, indexingTmpDir),
+        transformer
+    );
+  }
+
+  @Override
+  public CloseableIterator<InputRow> read() throws IOException
+  {
+    return delegate.read();
+  }
+
+  @Override
+  public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
+  {
+    return delegate.sample();
+  }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java
new file mode 100644
index 0000000..3f9b281
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.data.input.impl.InputRowParser;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Abstraction for parsing stream data which internally uses {@link org.apache.druid.data.input.InputEntityReader}
+ * or {@link InputRowParser}. This class will be useful untill we remove the deprecated InputRowParser.
+ */
+class StreamChunkParser
+{
+  @Nullable
+  private final InputRowParser<ByteBuffer> parser;
+  private final SettableByteEntityReader byteEntityReader;
+
+  StreamChunkParser(@Nullable InputRowParser<ByteBuffer> parser, SettableByteEntityReader byteEntityReader)
+  {
+    this.parser = parser;
+    this.byteEntityReader = byteEntityReader;
+  }
+
+  List<InputRow> parse(List<byte[]> streamChunk) throws IOException
+  {
+    if (parser != null) {
+      return parseWithParser(parser, streamChunk);
+    } else {
+      return parseWithInputFormat(byteEntityReader, streamChunk);
+    }
+  }
+
+  private static List<InputRow> parseWithParser(InputRowParser<ByteBuffer> parser, List<byte[]> valueBytess)
+  {
+    final List<InputRow> rows = new ArrayList<>();
+    for (byte[] valueBytes : valueBytess) {
+      rows.addAll(parser.parseBatch(ByteBuffer.wrap(valueBytes)));
+    }
+    return rows;
+  }
+
+  private static List<InputRow> parseWithInputFormat(
+      SettableByteEntityReader byteEntityReader,
+      List<byte[]> valueBytess
+  ) throws IOException
+  {
+    final List<InputRow> rows = new ArrayList<>();
+    for (byte[] valueBytes : valueBytess) {
+      byteEntityReader.setEntity(new ByteEntity(valueBytes));
+      try (CloseableIterator<InputRow> rowIterator = byteEntityReader.read()) {
+        rowIterator.forEachRemaining(rows::add);
+      }
+    }
+    return rows;
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/transform/TransformSpec.java b/processing/src/main/java/org/apache/druid/segment/transform/TransformSpec.java
index 3eae123..6de7ac9 100644
--- a/processing/src/main/java/org/apache/druid/segment/transform/TransformSpec.java
+++ b/processing/src/main/java/org/apache/druid/segment/transform/TransformSpec.java
@@ -22,7 +22,6 @@ package org.apache.druid.segment.transform;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableList;
-import org.apache.druid.data.input.InputEntityReader;
 import org.apache.druid.data.input.InputSourceReader;
 import org.apache.druid.data.input.impl.InputRowParser;
 import org.apache.druid.data.input.impl.StringInputRowParser;
@@ -118,11 +117,6 @@ public class TransformSpec
     return new TransformingInputSourceReader(reader, toTransformer());
   }
 
-  public InputEntityReader decorate(InputEntityReader reader)
-  {
-    return new TransformingInputEntityReader(reader, toTransformer());
-  }
-
   /**
    * Create a {@link Transformer} from this TransformSpec, when the rows to be transformed do not have a known
    * signature.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org