You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2020/04/07 21:19:41 UTC
[druid] branch 0.18.0 updated: Reuse transformer in stream indexing
(#9625) (#9639)
This is an automated email from the ASF dual-hosted git repository.
fjy pushed a commit to branch 0.18.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/0.18.0 by this push:
new 4df4eac Reuse transformer in stream indexing (#9625) (#9639)
4df4eac is described below
commit 4df4eacf231501175b61cb830038a1d6ee769601
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Tue Apr 7 14:19:30 2020 -0700
Reuse transformer in stream indexing (#9625) (#9639)
* Reuse transformer in stream indexing
* remove unused method
* memoize complied pattern
---
.../druid/data/input/impl/RegexInputFormat.java | 29 +++++++-
.../apache/druid/data/input/impl/RegexReader.java | 7 +-
.../data/input/impl/RegexInputFormatTest.java | 72 +++++++++++++++++++
.../SeekableStreamIndexTaskRunner.java | 54 ++++----------
.../seekablestream/SettableByteEntityReader.java | 84 ++++++++++++++++++++++
.../indexing/seekablestream/StreamChunkParser.java | 81 +++++++++++++++++++++
.../druid/segment/transform/TransformSpec.java | 6 --
7 files changed, 283 insertions(+), 50 deletions(-)
diff --git a/core/src/main/java/org/apache/druid/data/input/impl/RegexInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/RegexInputFormat.java
index 1165516..c3745bc 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/RegexInputFormat.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/RegexInputFormat.java
@@ -20,7 +20,10 @@
package org.apache.druid.data.input.impl;
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputFormat;
@@ -29,12 +32,15 @@ import org.apache.druid.data.input.InputRowSchema;
import javax.annotation.Nullable;
import java.io.File;
import java.util.List;
+import java.util.regex.Pattern;
public class RegexInputFormat implements InputFormat
{
private final String pattern;
private final String listDelimiter;
private final List<String> columns;
+ @JsonIgnore
+ private final Supplier<Pattern> compiledPatternSupplier;
@JsonCreator
public RegexInputFormat(
@@ -46,6 +52,27 @@ public class RegexInputFormat implements InputFormat
this.pattern = pattern;
this.listDelimiter = listDelimiter;
this.columns = columns;
+ this.compiledPatternSupplier = Suppliers.memoize(() -> Pattern.compile(pattern));
+ }
+
+ @JsonProperty
+ public String getPattern()
+ {
+ return pattern;
+ }
+
+ @Nullable
+ @JsonProperty
+ public String getListDelimiter()
+ {
+ return listDelimiter;
+ }
+
+ @Nullable
+ @JsonProperty
+ public List<String> getColumns()
+ {
+ return columns;
}
@Override
@@ -57,6 +84,6 @@ public class RegexInputFormat implements InputFormat
@Override
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{
- return new RegexReader(inputRowSchema, source, pattern, listDelimiter, columns);
+ return new RegexReader(inputRowSchema, source, pattern, compiledPatternSupplier.get(), listDelimiter, columns);
}
}
diff --git a/core/src/main/java/org/apache/druid/data/input/impl/RegexReader.java b/core/src/main/java/org/apache/druid/data/input/impl/RegexReader.java
index 2962ebd..e29e1df 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/RegexReader.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/RegexReader.java
@@ -42,7 +42,7 @@ import java.util.regex.Pattern;
public class RegexReader extends TextReader
{
private final String pattern;
- private final Pattern compiled;
+ private final Pattern compiledPattern;
private final Function<String, Object> multiValueFunction;
private List<String> columns;
@@ -51,13 +51,14 @@ public class RegexReader extends TextReader
InputRowSchema inputRowSchema,
InputEntity source,
String pattern,
+ Pattern compiledPattern,
@Nullable String listDelimiter,
@Nullable List<String> columns
)
{
super(inputRowSchema, source);
this.pattern = pattern;
- this.compiled = Pattern.compile(pattern);
+ this.compiledPattern = compiledPattern;
final String finalListDelimeter = listDelimiter == null ? Parsers.DEFAULT_LIST_DELIMITER : listDelimiter;
this.multiValueFunction = ParserUtils.getMultiValueFunction(finalListDelimeter, Splitter.on(finalListDelimeter));
this.columns = columns;
@@ -78,7 +79,7 @@ public class RegexReader extends TextReader
private Map<String, Object> parseLine(String line)
{
try {
- final Matcher matcher = compiled.matcher(line);
+ final Matcher matcher = compiledPattern.matcher(line);
if (!matcher.matches()) {
throw new ParseException("Incorrect Regex: %s . No match found.", pattern);
diff --git a/core/src/test/java/org/apache/druid/data/input/impl/RegexInputFormatTest.java b/core/src/test/java/org/apache/druid/data/input/impl/RegexInputFormatTest.java
new file mode 100644
index 0000000..9754a75
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/data/input/impl/RegexInputFormatTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.impl;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.InputFormat;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class RegexInputFormatTest
+{
+ private final ObjectMapper mapper;
+
+ public RegexInputFormatTest()
+ {
+ mapper = new ObjectMapper();
+ mapper.registerSubtypes(new NamedType(RegexInputFormat.class, "regex"));
+ }
+
+ @Test
+ public void testSerde() throws IOException
+ {
+ final RegexInputFormat expected = new RegexInputFormat(
+ "//[^\\r\\n]*[\\r\\n]",
+ "|",
+ ImmutableList.of("col1", "col2", "col3")
+ );
+
+ final byte[] json = mapper.writeValueAsBytes(expected);
+ final RegexInputFormat fromJson = (RegexInputFormat) mapper.readValue(json, InputFormat.class);
+
+ Assert.assertEquals(expected.getPattern(), fromJson.getPattern());
+ Assert.assertEquals(expected.getListDelimiter(), fromJson.getListDelimiter());
+ Assert.assertEquals(expected.getColumns(), fromJson.getColumns());
+ }
+
+ @Test
+ public void testIgnoreCompiledPatternInJson() throws IOException
+ {
+ final RegexInputFormat expected = new RegexInputFormat(
+ "//[^\\r\\n]*[\\r\\n]",
+ "|",
+ ImmutableList.of("col1", "col2", "col3")
+ );
+
+ final byte[] json = mapper.writeValueAsBytes(expected);
+ final Map<String, Object> map = mapper.readValue(json, Map.class);
+ Assert.assertFalse(map.containsKey("compiledPattern"));
+ }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 9a18322..1577833 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -38,11 +38,9 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.data.input.Committer;
-import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
-import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.LookupNodeService;
@@ -74,7 +72,6 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.collect.Utils;
-import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.aggregation.AggregatorFactory;
@@ -206,6 +203,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
private final SeekableStreamIndexTaskTuningConfig tuningConfig;
private final InputRowSchema inputRowSchema;
private final InputFormat inputFormat;
+ @Nullable
private final InputRowParser<ByteBuffer> parser;
private final AuthorizerMapper authorizerMapper;
private final Optional<ChatHandlerProvider> chatHandlerProvider;
@@ -364,48 +362,24 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
log.info("Starting with sequences: %s", sequences);
}
- private List<InputRow> parseBytes(List<byte[]> valueBytess) throws IOException
- {
- if (parser != null) {
- return parseWithParser(valueBytess);
- } else {
- return parseWithInputFormat(valueBytess);
- }
- }
-
- private List<InputRow> parseWithParser(List<byte[]> valueBytess)
- {
- final List<InputRow> rows = new ArrayList<>();
- for (byte[] valueBytes : valueBytess) {
- rows.addAll(parser.parseBatch(ByteBuffer.wrap(valueBytes)));
- }
- return rows;
- }
-
- private List<InputRow> parseWithInputFormat(List<byte[]> valueBytess) throws IOException
- {
- final List<InputRow> rows = new ArrayList<>();
- for (byte[] valueBytes : valueBytess) {
- final InputEntityReader reader = task.getDataSchema().getTransformSpec().decorate(
- Preconditions.checkNotNull(inputFormat, "inputFormat").createReader(
- inputRowSchema,
- new ByteEntity(valueBytes),
- toolbox.getIndexingTmpDir()
- )
- );
- try (CloseableIterator<InputRow> rowIterator = reader.read()) {
- rowIterator.forEachRemaining(rows::add);
- }
- }
- return rows;
- }
-
private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
{
startTime = DateTimes.nowUtc();
status = Status.STARTING;
setToolbox(toolbox);
+
+ // Now we can initialize StreamChunkReader with the given toolbox.
+ final StreamChunkParser parser = new StreamChunkParser(
+ this.parser,
+ new SettableByteEntityReader(
+ inputFormat,
+ inputRowSchema,
+ task.getDataSchema().getTransformSpec(),
+ toolbox.getIndexingTmpDir()
+ )
+ );
+
initializeSequences();
if (chatHandlerProvider.isPresent()) {
@@ -657,7 +631,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
if (valueBytess == null || valueBytess.isEmpty()) {
rows = Utils.nullableListOf((InputRow) null);
} else {
- rows = parseBytes(valueBytess);
+ rows = parser.parse(valueBytess);
}
boolean isPersistRequired = false;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntityReader.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntityReader.java
new file mode 100644
index 0000000..e34ee67
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntityReader.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowListPlusRawValues;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.segment.transform.TransformSpec;
+import org.apache.druid.segment.transform.Transformer;
+import org.apache.druid.segment.transform.TransformingInputEntityReader;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * A settable {@link InputEntityReader}. This class is intended to be used for only stream parsing in Kafka or Kinesis
+ * indexing.
+ */
+class SettableByteEntityReader implements InputEntityReader
+{
+ private final InputFormat inputFormat;
+ private final InputRowSchema inputRowSchema;
+ private final Transformer transformer;
+ private final File indexingTmpDir;
+
+ private InputEntityReader delegate;
+
+ SettableByteEntityReader(
+ InputFormat inputFormat,
+ InputRowSchema inputRowSchema,
+ TransformSpec transformSpec,
+ File indexingTmpDir
+ )
+ {
+ this.inputFormat = Preconditions.checkNotNull(inputFormat, "inputFormat");
+ this.inputRowSchema = inputRowSchema;
+ this.transformer = transformSpec.toTransformer();
+ this.indexingTmpDir = indexingTmpDir;
+ }
+
+ void setEntity(ByteEntity entity) throws IOException
+ {
+ this.delegate = new TransformingInputEntityReader(
+ // Yes, we are creating a new reader for every stream chunk.
+ // This should be fine as long as initializing a reader is cheap which it is for now.
+ inputFormat.createReader(inputRowSchema, entity, indexingTmpDir),
+ transformer
+ );
+ }
+
+ @Override
+ public CloseableIterator<InputRow> read() throws IOException
+ {
+ return delegate.read();
+ }
+
+ @Override
+ public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
+ {
+ return delegate.sample();
+ }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java
new file mode 100644
index 0000000..3f9b281
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.data.input.impl.InputRowParser;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Abstraction for parsing stream data which internally uses {@link org.apache.druid.data.input.InputEntityReader}
+ * or {@link InputRowParser}. This class will be useful untill we remove the deprecated InputRowParser.
+ */
+class StreamChunkParser
+{
+ @Nullable
+ private final InputRowParser<ByteBuffer> parser;
+ private final SettableByteEntityReader byteEntityReader;
+
+ StreamChunkParser(@Nullable InputRowParser<ByteBuffer> parser, SettableByteEntityReader byteEntityReader)
+ {
+ this.parser = parser;
+ this.byteEntityReader = byteEntityReader;
+ }
+
+ List<InputRow> parse(List<byte[]> streamChunk) throws IOException
+ {
+ if (parser != null) {
+ return parseWithParser(parser, streamChunk);
+ } else {
+ return parseWithInputFormat(byteEntityReader, streamChunk);
+ }
+ }
+
+ private static List<InputRow> parseWithParser(InputRowParser<ByteBuffer> parser, List<byte[]> valueBytess)
+ {
+ final List<InputRow> rows = new ArrayList<>();
+ for (byte[] valueBytes : valueBytess) {
+ rows.addAll(parser.parseBatch(ByteBuffer.wrap(valueBytes)));
+ }
+ return rows;
+ }
+
+ private static List<InputRow> parseWithInputFormat(
+ SettableByteEntityReader byteEntityReader,
+ List<byte[]> valueBytess
+ ) throws IOException
+ {
+ final List<InputRow> rows = new ArrayList<>();
+ for (byte[] valueBytes : valueBytess) {
+ byteEntityReader.setEntity(new ByteEntity(valueBytes));
+ try (CloseableIterator<InputRow> rowIterator = byteEntityReader.read()) {
+ rowIterator.forEachRemaining(rows::add);
+ }
+ }
+ return rows;
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/transform/TransformSpec.java b/processing/src/main/java/org/apache/druid/segment/transform/TransformSpec.java
index 3eae123..6de7ac9 100644
--- a/processing/src/main/java/org/apache/druid/segment/transform/TransformSpec.java
+++ b/processing/src/main/java/org/apache/druid/segment/transform/TransformSpec.java
@@ -22,7 +22,6 @@ package org.apache.druid.segment.transform;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
-import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.StringInputRowParser;
@@ -118,11 +117,6 @@ public class TransformSpec
return new TransformingInputSourceReader(reader, toTransformer());
}
- public InputEntityReader decorate(InputEntityReader reader)
- {
- return new TransformingInputEntityReader(reader, toTransformer());
- }
-
/**
* Create a {@link Transformer} from this TransformSpec, when the rows to be transformed do not have a known
* signature.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org