You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "github-code-scanning[bot] (via GitHub)" <gi...@apache.org> on 2023/02/08 09:49:35 UTC

[GitHub] [druid] github-code-scanning[bot] commented on a diff in pull request #12852: Removes FiniteFirehoseFactory and its implementations

github-code-scanning[bot] commented on code in PR #12852:
URL: https://github.com/apache/druid/pull/12852#discussion_r1099894611


##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java:
##########
@@ -243,22 +241,20 @@
     } else {
       Preconditions.checkArgument(inputFormat == null);
       ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
-          new LocalFirehoseFactory(inputDirectory, filter, null),
+          null,
+          new LocalInputSource(inputDirectory, filter),
+          createInputFormatFromParseSpec(parseSpec),
           appendToExisting,
           dropExisting
       );
-      //noinspection unchecked
       ingestionSpec = new ParallelIndexIngestionSpec(
           new DataSchema(
-              "dataSource",
-              getObjectMapper().convertValue(
-                  new StringInputRowParser(parseSpec, null),
-                  Map.class
-              ),
+              DATASOURCE,
+              parseSpec.getTimestampSpec(),

Review Comment:
   ## Dereferenced variable may be null
   
   Variable [parseSpec](1) may be null at this access because of [this](2) null argument.
   Variable [parseSpec](1) may be null at this access because of [this](3) null argument.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4250)



##########
extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java:
##########
@@ -167,6 +174,87 @@
         OBJECT_MAPPER
     );
 
+    runSamplerAndCompareResponse(samplerSpec, true);
+  }
+
+  @Test
+  public void testWithInputRowParser() throws IOException
+  {
+    insertData(generateRecords(TOPIC));
+
+    ObjectMapper objectMapper = new DefaultObjectMapper();
+    TimestampSpec timestampSpec = new TimestampSpec("timestamp", "iso", null);
+    DimensionsSpec dimensionsSpec = new DimensionsSpec(
+        Arrays.asList(
+            new StringDimensionSchema("dim1"),
+            new StringDimensionSchema("dim1t"),
+            new StringDimensionSchema("dim2"),
+            new LongDimensionSchema("dimLong"),
+            new FloatDimensionSchema("dimFloat")
+        )
+    );
+    InputRowParser parser = new StringInputRowParser(new JSONParseSpec(timestampSpec, dimensionsSpec, JSONPathSpec.DEFAULT, null, null), "UTF8");
+
+    DataSchema dataSchema = new DataSchema(
+        "test_ds",
+        objectMapper.readValue(objectMapper.writeValueAsBytes(parser), Map.class),
+        new AggregatorFactory[]{
+            new DoubleSumAggregatorFactory("met1sum", "met1"),
+            new CountAggregatorFactory("rows")
+        },
+        new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
+        null,
+        objectMapper
+    );

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [DataSchema.DataSchema](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4238)



##########
extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java:
##########
@@ -112,30 +118,93 @@
   }
 
   @Test(timeout = 10_000L)
-  public void testSample() throws Exception
+  public void testSample() throws InterruptedException
   {
-    EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).once();
-
-    recordSupplier.assign(ImmutableSet.of(StreamPartition.of(STREAM, SHARD_ID)));
-    EasyMock.expectLastCall().once();
+    KinesisSupervisorSpec supervisorSpec = new KinesisSupervisorSpec(
+        null,
+        DATA_SCHEMA,
+        null,
+        new KinesisSupervisorIOConfig(
+            STREAM,
+            new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false),
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            true,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            false
+        ),
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null
+    );
 
-    recordSupplier.seekToEarliest(ImmutableSet.of(StreamPartition.of(STREAM, SHARD_ID)));
-    EasyMock.expectLastCall().once();
+    KinesisSamplerSpec samplerSpec = new TestableKinesisSamplerSpec(
+        supervisorSpec,
+        new SamplerConfig(5, null, null, null),
+        new InputSourceSampler(new DefaultObjectMapper()),
+        null
+    );
 
-    EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(STREAM)).once();
+    runSamplerAndCompareResponse(samplerSpec, true);
+  }
 
-    recordSupplier.close();
-    EasyMock.expectLastCall().once();
+  @Test
+  public void testWithInputRowParser() throws IOException
+  {
+    ObjectMapper objectMapper = new DefaultObjectMapper();
+    TimestampSpec timestampSpec = new TimestampSpec("timestamp", "iso", null);
+    DimensionsSpec dimensionsSpec = new DimensionsSpec(
+        Arrays.asList(
+            new StringDimensionSchema("dim1"),
+            new StringDimensionSchema("dim1t"),
+            new StringDimensionSchema("dim2"),
+            new LongDimensionSchema("dimLong"),
+            new FloatDimensionSchema("dimFloat")
+        )
+    );
+    InputRowParser parser = new StringInputRowParser(new JSONParseSpec(timestampSpec, dimensionsSpec, JSONPathSpec.DEFAULT, null, null), "UTF8");
 
-    replayAll();
+    DataSchema dataSchema = new DataSchema(
+        "test_ds",
+        objectMapper.readValue(objectMapper.writeValueAsBytes(parser), Map.class),
+        new AggregatorFactory[]{
+            new DoubleSumAggregatorFactory("met1sum", "met1"),
+            new CountAggregatorFactory("rows")
+        },
+        new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
+        null,
+        objectMapper
+    );

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [DataSchema.DataSchema](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4239)



##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java:
##########
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.client.indexing.SamplerResponse;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.FloatDimensionSchema;
+import org.apache.druid.data.input.impl.JSONParseSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.overlord.sampler.InputSourceSampler;
+import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
+import org.apache.druid.indexing.overlord.sampler.SamplerTestUtils;
+import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
+import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.common.StreamPartition;
+import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
+import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockSupport;
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class SeekableStreamSamplerSpecTest extends EasyMockSupport
+{
+  private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
+  private static final String STREAM = "sampling";
+  private static final String SHARD_ID = "1";
+
+  private final SeekableStreamSupervisorSpec supervisorSpec = mock(SeekableStreamSupervisorSpec.class);
+
+  static {
+    NullHandling.initializeForTests();
+  }
+
+  private final RecordSupplier recordSupplier = mock(RecordSupplier.class);
+
+  private static List<OrderedPartitionableRecord<String, String, ByteEntity>> generateRecords(String stream)
+  {
+    return ImmutableList.of(
+        new OrderedPartitionableRecord<>(stream, "1", "0", jb("2008", "a", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "1", jb("2009", "b", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "2", jb("2010", "c", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(
+            stream,
+            "1",
+            "5",
+            jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")
+        ),
+        new OrderedPartitionableRecord<>(
+            stream,
+            "1",
+            "6",
+            Collections.singletonList(new ByteEntity(StringUtils.toUtf8("unparseable")))
+        ),
+        new OrderedPartitionableRecord<>(stream, "1", "8", Collections.singletonList(new ByteEntity(StringUtils.toUtf8("{}"))))
+    );
+  }
+
+  @Test(timeout = 10_000L)
+  public void testSampleWithInputRowParser() throws Exception
+  {
+    final DataSchema dataSchema = new DataSchema(
+        "test_ds",
+        OBJECT_MAPPER.convertValue(
+            new StringInputRowParser(
+                new JSONParseSpec(
+                    new TimestampSpec("timestamp", "iso", null),
+                    new DimensionsSpec(
+                        Arrays.asList(
+                            new StringDimensionSchema("dim1"),
+                            new StringDimensionSchema("dim1t"),
+                            new StringDimensionSchema("dim2"),
+                            new LongDimensionSchema("dimLong"),
+                            new FloatDimensionSchema("dimFloat")
+                        )
+                    ),
+                    new JSONPathSpec(true, ImmutableList.of()),
+                    ImmutableMap.of(),
+                    false
+                )
+            ),
+            Map.class
+        ),
+        new AggregatorFactory[]{
+            new DoubleSumAggregatorFactory("met1sum", "met1"),
+            new CountAggregatorFactory("rows")
+        },
+        new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
+        null,
+        OBJECT_MAPPER
+    );

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [DataSchema.DataSchema](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4244)



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java:
##########
@@ -141,30 +139,42 @@
     }
 
     @Override
-    public Stream<InputSplit<Object>> getSplits(@Nullable SplitHintSpec splitHintSpec)
+    public Stream<InputSplit> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
     {
       throw new UnsupportedOperationException();
     }
 
     @Override
-    public int getNumSplits(@Nullable SplitHintSpec splitHintSpec)
+    public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
     {
       throw new UnsupportedOperationException();
     }
 
     @Override
-    public FiniteFirehoseFactory withSplit(InputSplit split)
+    public SplittableInputSource withSplit(InputSplit split)
     {
       throw new UnsupportedOperationException();
     }
+
+    @Override
+    public boolean needsFormat()
+    {
+      return false;
+    }
+
+    @Override
+    protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nullable File temporaryDirectory)
+    {
+      return new SeekableStreamSamplerInputSourceReader(parser);
+    }
   }
 
-  private class SeekableStreamSamplerFirehose implements Firehose
+  private class SeekableStreamSamplerInputSourceReader implements InputSourceReader

Review Comment:
   ## Inner class could be static
   
   SeekableStreamSamplerInputSourceReader could be made static, since the enclosing instance is used only in its constructor.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4251)



##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java:
##########
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.client.indexing.SamplerResponse;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.FloatDimensionSchema;
+import org.apache.druid.data.input.impl.JSONParseSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.overlord.sampler.InputSourceSampler;
+import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
+import org.apache.druid.indexing.overlord.sampler.SamplerTestUtils;
+import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
+import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.common.StreamPartition;
+import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
+import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockSupport;
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class SeekableStreamSamplerSpecTest extends EasyMockSupport
+{
+  private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
+  private static final String STREAM = "sampling";
+  private static final String SHARD_ID = "1";
+
+  private final SeekableStreamSupervisorSpec supervisorSpec = mock(SeekableStreamSupervisorSpec.class);
+
+  static {
+    NullHandling.initializeForTests();
+  }
+
+  private final RecordSupplier recordSupplier = mock(RecordSupplier.class);
+
+  private static List<OrderedPartitionableRecord<String, String, ByteEntity>> generateRecords(String stream)
+  {
+    return ImmutableList.of(
+        new OrderedPartitionableRecord<>(stream, "1", "0", jb("2008", "a", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "1", jb("2009", "b", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "2", jb("2010", "c", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(
+            stream,
+            "1",
+            "5",
+            jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")
+        ),
+        new OrderedPartitionableRecord<>(
+            stream,
+            "1",
+            "6",
+            Collections.singletonList(new ByteEntity(StringUtils.toUtf8("unparseable")))
+        ),
+        new OrderedPartitionableRecord<>(stream, "1", "8", Collections.singletonList(new ByteEntity(StringUtils.toUtf8("{}"))))
+    );
+  }
+
+  @Test(timeout = 10_000L)
+  public void testSampleWithInputRowParser() throws Exception
+  {
+    final DataSchema dataSchema = new DataSchema(
+        "test_ds",
+        OBJECT_MAPPER.convertValue(
+            new StringInputRowParser(
+                new JSONParseSpec(
+                    new TimestampSpec("timestamp", "iso", null),
+                    new DimensionsSpec(
+                        Arrays.asList(
+                            new StringDimensionSchema("dim1"),
+                            new StringDimensionSchema("dim1t"),
+                            new StringDimensionSchema("dim2"),
+                            new LongDimensionSchema("dimLong"),
+                            new FloatDimensionSchema("dimFloat")
+                        )
+                    ),
+                    new JSONPathSpec(true, ImmutableList.of()),
+                    ImmutableMap.of(),
+                    false
+                )
+            ),
+            Map.class
+        ),
+        new AggregatorFactory[]{
+            new DoubleSumAggregatorFactory("met1sum", "met1"),
+            new CountAggregatorFactory("rows")
+        },
+        new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
+        null,
+        OBJECT_MAPPER
+    );
+
+    final SeekableStreamSupervisorIOConfig supervisorIOConfig = new TestableSeekableStreamSupervisorIOConfig(
+        STREAM,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        true,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null
+    );
+
+    EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).once();
+    EasyMock.expect(supervisorSpec.getDataSchema()).andReturn(dataSchema).once();

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [SeekableStreamSupervisorSpec.getDataSchema](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4246)



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java:
##########
@@ -291,6 +295,83 @@
           null
       );
     }
+
+    @Test
+    public void testFailToConstructWhenBothInputSourceAndParserAreSet()
+    {
+      final ObjectMapper mapper = new DefaultObjectMapper();
+      final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
+          null,
+          new InlineInputSource("test"),
+          null,
+          false,
+          null
+      );
+      final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
+          null,
+          null,
+          null,
+          10,
+          1000L,
+          null,
+          null,
+          null,
+          null,
+          new HashedPartitionsSpec(null, 10, null),
+          new IndexSpec(
+              new RoaringBitmapSerdeFactory(true),
+              CompressionStrategy.UNCOMPRESSED,
+              CompressionStrategy.LZF,
+              LongEncodingStrategy.LONGS
+          ),
+          new IndexSpec(),
+          1,
+          true,
+          true,
+          10000L,
+          OffHeapMemorySegmentWriteOutMediumFactory.instance(),
+          null,
+          10,
+          100,
+          20L,
+          new Duration(3600),
+          128,
+          null,
+          null,
+          false,
+          null,
+          null,
+          null,
+          null,
+          null
+      );
+
+      expectedException.expect(IAE.class);
+      expectedException.expectMessage("Cannot use parser and inputSource together. Try using inputFormat instead of parser.");
+      new ParallelIndexIngestionSpec(
+          new DataSchema(
+              "datasource",
+              mapper.convertValue(
+                  new StringInputRowParser(
+                      new JSONParseSpec(
+                          new TimestampSpec(null, null, null),
+                          DimensionsSpec.EMPTY,
+                          null,
+                          null,
+                          null
+                      )
+                  ),
+                  Map.class
+              ),
+              null,
+              null,
+              null,
+              mapper
+          ),

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [DataSchema.DataSchema](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4242)



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java:
##########
@@ -291,6 +295,83 @@
           null
       );
     }
+
+    @Test
+    public void testFailToConstructWhenBothInputSourceAndParserAreSet()
+    {
+      final ObjectMapper mapper = new DefaultObjectMapper();
+      final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
+          null,
+          new InlineInputSource("test"),
+          null,
+          false,
+          null
+      );
+      final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
+          null,
+          null,
+          null,
+          10,
+          1000L,
+          null,
+          null,
+          null,
+          null,
+          new HashedPartitionsSpec(null, 10, null),
+          new IndexSpec(
+              new RoaringBitmapSerdeFactory(true),
+              CompressionStrategy.UNCOMPRESSED,
+              CompressionStrategy.LZF,
+              LongEncodingStrategy.LONGS
+          ),
+          new IndexSpec(),
+          1,
+          true,
+          true,
+          10000L,
+          OffHeapMemorySegmentWriteOutMediumFactory.instance(),
+          null,
+          10,
+          100,
+          20L,
+          new Duration(3600),
+          128,
+          null,
+          null,
+          false,
+          null,
+          null,
+          null,
+          null,
+          null
+      );
+
+      expectedException.expect(IAE.class);
+      expectedException.expectMessage("Cannot use parser and inputSource together. Try using inputFormat instead of parser.");
+      new ParallelIndexIngestionSpec(
+          new DataSchema(
+              "datasource",
+              mapper.convertValue(
+                  new StringInputRowParser(
+                      new JSONParseSpec(
+                          new TimestampSpec(null, null, null),
+                          DimensionsSpec.EMPTY,
+                          null,
+                          null,
+                          null
+                      )
+                  ),

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [StringInputRowParser.StringInputRowParser](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4243)



##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java:
##########
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.client.indexing.SamplerResponse;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.FloatDimensionSchema;
+import org.apache.druid.data.input.impl.JSONParseSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.overlord.sampler.InputSourceSampler;
+import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
+import org.apache.druid.indexing.overlord.sampler.SamplerTestUtils;
+import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
+import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.common.StreamPartition;
+import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
+import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockSupport;
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class SeekableStreamSamplerSpecTest extends EasyMockSupport
+{
+  private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
+  private static final String STREAM = "sampling";
+  private static final String SHARD_ID = "1";
+
+  private final SeekableStreamSupervisorSpec supervisorSpec = mock(SeekableStreamSupervisorSpec.class);
+
+  static {
+    NullHandling.initializeForTests();
+  }
+
+  private final RecordSupplier recordSupplier = mock(RecordSupplier.class);
+
+  private static List<OrderedPartitionableRecord<String, String, ByteEntity>> generateRecords(String stream)
+  {
+    return ImmutableList.of(
+        new OrderedPartitionableRecord<>(stream, "1", "0", jb("2008", "a", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "1", jb("2009", "b", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "2", jb("2010", "c", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(
+            stream,
+            "1",
+            "5",
+            jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")
+        ),
+        new OrderedPartitionableRecord<>(
+            stream,
+            "1",
+            "6",
+            Collections.singletonList(new ByteEntity(StringUtils.toUtf8("unparseable")))
+        ),
+        new OrderedPartitionableRecord<>(stream, "1", "8", Collections.singletonList(new ByteEntity(StringUtils.toUtf8("{}"))))
+    );
+  }
+
+  @Test(timeout = 10_000L)
+  public void testSampleWithInputRowParser() throws Exception
+  {
+    final DataSchema dataSchema = new DataSchema(
+        "test_ds",
+        OBJECT_MAPPER.convertValue(
+            new StringInputRowParser(
+                new JSONParseSpec(
+                    new TimestampSpec("timestamp", "iso", null),
+                    new DimensionsSpec(
+                        Arrays.asList(
+                            new StringDimensionSchema("dim1"),
+                            new StringDimensionSchema("dim1t"),
+                            new StringDimensionSchema("dim2"),
+                            new LongDimensionSchema("dimLong"),
+                            new FloatDimensionSchema("dimFloat")
+                        )
+                    ),
+                    new JSONPathSpec(true, ImmutableList.of()),
+                    ImmutableMap.of(),
+                    false
+                )
+            ),

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [StringInputRowParser.StringInputRowParser](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4245)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org