You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2021/03/28 08:33:06 UTC

[GitHub] [druid] bananaaggle opened a new pull request #11040: add avro stream input format

bananaaggle opened a new pull request #11040:
URL: https://github.com/apache/druid/pull/11040


   Because of deprecated of parseSpec, I develop AvroStreamInputFormat for new interface, which supports stream ingestion for data encoded by Avro.
   
   This PR has:
   - [ ] been self-reviewed.
      - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on a change in pull request #11040: add avro stream input format

Posted by GitBox <gi...@apache.org>.
clintropolis commented on a change in pull request #11040:
URL: https://github.com/apache/druid/pull/11040#discussion_r610949007



##########
File path: extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.avro;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.impl.NestedInputFormat;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.util.Objects;
+
+public class AvroStreamInputFormat extends NestedInputFormat
+{
+  private final boolean binaryAsString;
+
+  private final AvroBytesDecoder avroBytesDecoder;
+
+  @JsonCreator
+  public AvroStreamInputFormat(
+      @JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec,
+      @JsonProperty("avroBytesDecoder") AvroBytesDecoder avroBytesDecoder,
+      @JsonProperty("binaryAsString") @Nullable Boolean binaryAsString

Review comment:
       missing a `binaryAsString` getter annotated with `@JsonProperty` i think




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis edited a comment on pull request #11040: add avro stream input format

Posted by GitBox <gi...@apache.org>.
clintropolis edited a comment on pull request #11040:
URL: https://github.com/apache/druid/pull/11040#issuecomment-817029533


   It seems like https://github.com/apache/druid/blob/master/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java#L50 is missing getter methods annotated with `@JsonProperty`, which I suspect is related to the test failure. I'm not really sure how the parser based integration test is passing since it doesn't seem like serializing the schema-registry bytes decoder should work... (looking into this).
   
   Could you add serialization round trip tests for more `AvroBytesDecoder` implementations to your unit tests with the input format so we can get coverage on this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] bananaaggle commented on pull request #11040: add avro stream input format

Posted by GitBox <gi...@apache.org>.
bananaaggle commented on pull request #11040:
URL: https://github.com/apache/druid/pull/11040#issuecomment-817264449


   @clintropolis  Hi, I fix this bug and pass integration-tests. Then I add one unit test for this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] bananaaggle commented on a change in pull request #11040: add avro stream input format

Posted by GitBox <gi...@apache.org>.
bananaaggle commented on a change in pull request #11040:
URL: https://github.com/apache/druid/pull/11040#discussion_r611440603



##########
File path: extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.avro;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.impl.NestedInputFormat;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.util.Objects;
+
+public class AvroStreamInputFormat extends NestedInputFormat
+{
+  private final boolean binaryAsString;
+
+  private final AvroBytesDecoder avroBytesDecoder;
+
+  @JsonCreator
+  public AvroStreamInputFormat(
+      @JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec,
+      @JsonProperty("avroBytesDecoder") AvroBytesDecoder avroBytesDecoder,
+      @JsonProperty("binaryAsString") @Nullable Boolean binaryAsString

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis edited a comment on pull request #11040: add avro stream input format

Posted by GitBox <gi...@apache.org>.
clintropolis edited a comment on pull request #11040:
URL: https://github.com/apache/druid/pull/11040#issuecomment-810975092


   >Sorry, I don't understand what you said. Do you mean an integration test is developing for Avro stream and when it finished, I can add a new json about this test? Or I need create this integration test by myself?
   
   Ah sorry, let me try to explain a bit more. So in the case of avro inline schema and avro schema registry, you should be able to just add the JSON files with the `InputFormat` template and get the tests for free. The integration test is [`ITKafkaIndexingServiceDataFormatTest`](https://github.com/apache/druid/blob/master/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceDataFormatTest.java) is used to test the same data with kafka streaming using a variety of different data formats which are supported by kafka ingestion. It works by iterating over the JSON templates in https://github.com/apache/druid/tree/master/integration-tests/src/test/resources/stream/data to test each data-format present in that directory with the same set of data. For each of these data formats in the integration tests, there is implemented a corresponding [`EventSerializer`](https://github.com/apache/druid/blob/master/integration-tests/src/main/java/org/apache/druid/te
 sting/utils/EventSerializer.java), which writes data for the tests to the Kafka stream for the format, and the parser or inputFormat templates are then used to construct a supervisor to spawn indexing tasks to read the data and run the actual tests to verify stuff is working correctly.
   
   [`AvroEventSerializer`](https://github.com/apache/druid/blob/master/integration-tests/src/main/java/org/apache/druid/testing/utils/AvroEventSerializer.java), and [`AvroSchemaRegistryEventSerializer`](https://github.com/apache/druid/blob/master/integration-tests/src/main/java/org/apache/druid/testing/utils/AvroSchemaRegistryEventSerializer.java) are already present because there are integration tests using the Avro stream parsers ([avro inline](https://github.com/apache/druid/blob/master/integration-tests/src/test/resources/stream/data/avro/parser/input_row_parser.json) and [avro schema registry](https://github.com/apache/druid/blob/master/integration-tests/src/test/resources/stream/data/avro_schema_registry/parser/input_row_parser.json)), they are just missing the input format templates because it didn't exist until this PR (JSON, CSV, and TSV do have input format templates, [which might be useful as a reference](https://github.com/apache/druid/tree/master/integration-tests/src/te
 st/resources/stream/data)). You should just be able to adapt those parser templates into the equivalent input format template.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] bananaaggle commented on pull request #11040: add avro stream input format

Posted by GitBox <gi...@apache.org>.
bananaaggle commented on pull request #11040:
URL: https://github.com/apache/druid/pull/11040#issuecomment-810245683


   > code LGTM 👍
   > 
   > This PR needs docs added to https://github.com/apache/druid/blob/master/docs/ingestion/data-formats.md I think before it is ready to go.
   > 
   > I think it should also be relatively easy to add an integration test for this since we already have an integration test for the `Parser` implementation of Avro + Schema Registry. All that needs done is a new `input_format` directory be created in this location https://github.com/apache/druid/tree/master/integration-tests/src/test/resources/stream/data/avro_schema_registry with a new `input_format.json` template (using the `InputFormat` instead of the `Parser`). See JSON for example: https://github.com/apache/druid/tree/master/integration-tests/src/test/resources/stream/data/json. If this template is added, then _i think_ it should be automatically picked up and run as part of the kafka data format integration tests.
   
   Sorry, I don't understand what you said. Do you mean an integration test is developing for Avro stream and when it finished, I can add a new json about this test? Or I can create this integration test by myself? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] bananaaggle commented on pull request #11040: add avro stream input format

Posted by GitBox <gi...@apache.org>.
bananaaggle commented on pull request #11040:
URL: https://github.com/apache/druid/pull/11040#issuecomment-808865922


   @clintropolis Hi, I implement AvroStreamInputFormat as I mentioned last weekend. Can you review it and help me refine it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis merged pull request #11040: add avro stream input format

Posted by GitBox <gi...@apache.org>.
clintropolis merged pull request #11040:
URL: https://github.com/apache/druid/pull/11040


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on pull request #11040: add avro stream input format

Posted by GitBox <gi...@apache.org>.
clintropolis commented on pull request #11040:
URL: https://github.com/apache/druid/pull/11040#issuecomment-810975092


   >Sorry, I don't understand what you said. Do you mean an integration test is developing for Avro stream and when it finished, I can add a new json about this test? Or I need create this integration test by myself?
   
   Ah sorry, let me try to explain a bit more. So in the case of avro inline schema and avro schema registry, you should be able to just add the JSON files with the `InputFormat` template and get the tests for free. The integration test is [`ITKafkaIndexingServiceDataFormatTest`](https://github.com/apache/druid/blob/master/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceDataFormatTest.java) is used to test the same data with kafka streaming using a variety of different data formats which are supported by kafka. It works by iterating over the JSON templates in https://github.com/apache/druid/tree/master/integration-tests/src/test/resources/stream/data to test each data-format present in that directory with the same set of data. For each of these data formats in the integration tests, there is implemented a corresponding [`EventSerializer`](https://github.com/apache/druid/blob/master/integration-tests/src/main/java/org/apache/druid/testing/util
 s/EventSerializer.java), which writes data for the tests to the Kafka stream for the format, and the parser or inputFormat templates are then used to construct a supervisor to read the data and run the actual tests to verify stuff is working correctly.
   
   [`AvroEventSerializer`](https://github.com/apache/druid/blob/master/integration-tests/src/main/java/org/apache/druid/testing/utils/AvroEventSerializer.java), and [`AvroSchemaRegistryEventSerializer`](https://github.com/apache/druid/blob/master/integration-tests/src/main/java/org/apache/druid/testing/utils/AvroSchemaRegistryEventSerializer.java) are already present because there are integration tests using the Avro stream parsers ([avro inline](https://github.com/apache/druid/blob/master/integration-tests/src/test/resources/stream/data/avro/parser/input_row_parser.json) and [avro schema registry](https://github.com/apache/druid/blob/master/integration-tests/src/test/resources/stream/data/avro_schema_registry/parser/input_row_parser.json)), they are just missing the input format templates because it didn't exist until this PR (JSON, CSV, and TSV do have input format templates, [which might be useful as a reference](https://github.com/apache/druid/tree/master/integration-tests/src/te
 st/resources/stream/data)). You should just be able to adapt those parser templates into the equivalent input format template.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis edited a comment on pull request #11040: add avro stream input format

Posted by GitBox <gi...@apache.org>.
clintropolis edited a comment on pull request #11040:
URL: https://github.com/apache/druid/pull/11040#issuecomment-817023099


   I haven't quite determined what is going on yet, but it seems like there is some sort of serialization error that is causing the newly added schema-registry input format integration test to fail:
   ```
   Caused by: com.fasterxml.jackson.databind.exc.ValueInstantiationException: Cannot construct instance of `org.apache.druid.data.input.avro.SchemaRegistryBasedAvroBytesDecoder`, problem: Expected at least one URL to be passed in constructor
    at [Source: (byte[])&quot;{&quot;type&quot;:&quot;index_kafka&quot;,&quot;id&quot;:&quot;index_kafka_kafka_data_format_indexing_service_test_c940ce28-01a9-4070-9dc0-27df2903249e %?????? ?? ??!?_a0ffca1f01390e3_ejndffbp&quot;,&quot;resource&quot;:{&quot;availabilityGroup&quot;:&quot;index_kafka_kafka_data_format_indexing_service_test_c940ce28-01a9-4070-9dc0-27df2903249e %?????? ?? ??!?_a0ffca1f01390e3&quot;,&quot;requiredCapacity&quot;:1},&quot;dataSchema&quot;:{&quot;dataSource&quot;:&quot;kafka_data_format_indexing_service_test_c940ce28-01a9-4070-9dc0-27df2903249e %?????? ?? ??!?&quot;,&quot;[truncated 4940 bytes]; line: 1, column: 5072] (through reference chain: org.apache.druid.indexing.kafka.KafkaIndexTask[&quot;ioConfig&quot;]-&gt;org.apache.druid.indexing.kafka.KafkaIndexTaskIOConfig[&quot;inputFormat&quot;]-&gt;org.apache.druid.data.input.avro.AvroStreamInputFormat[&quot;avroBytesDecoder&quot;])
   	at com.fasterxml.jackson.databind.exc.ValueInstantiationException.from(ValueInstantiationException.java:47)
   	at com.fasterxml.jackson.databind.DeserializationContext.instantiationException(DeserializationContext.java:1735)
   	at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.wrapAsJsonMappingException(StdValueInstantiator.java:491)
   	at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.rewrapCtorProblem(StdValueInstantiator.java:514)
   	at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.createFromObjectWith(StdValueInstantiator.java:285)
   ```
   https://travis-ci.com/github/apache/druid/jobs/496046398#L9197
   
   The inline schema test is passing :+1:


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis edited a comment on pull request #11040: add avro stream input format

Posted by GitBox <gi...@apache.org>.
clintropolis edited a comment on pull request #11040:
URL: https://github.com/apache/druid/pull/11040#issuecomment-810975092


   >Sorry, I don't understand what you said. Do you mean an integration test is developing for Avro stream and when it finished, I can add a new json about this test? Or I need create this integration test by myself?
   
   Ah sorry, let me try to explain a bit more. So in the case of avro inline schema and avro schema registry, you should be able to just add the JSON files with the `InputFormat` template and get the tests for free. The integration test is [`ITKafkaIndexingServiceDataFormatTest`](https://github.com/apache/druid/blob/master/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceDataFormatTest.java) is used to test the same data with kafka streaming using a variety of different data formats which are supported by kafka ingestion. It works by iterating over the JSON templates in https://github.com/apache/druid/tree/master/integration-tests/src/test/resources/stream/data to test each data-format present in that directory with the same set of data. For each of these data formats in the integration tests, there is implemented a corresponding [`EventSerializer`](https://github.com/apache/druid/blob/master/integration-tests/src/main/java/org/apache/druid/te
 sting/utils/EventSerializer.java), which writes data for the tests to the Kafka stream for the format, and the parser or inputFormat templates are then used to construct a supervisor to read the data and run the actual tests to verify stuff is working correctly.
   
   [`AvroEventSerializer`](https://github.com/apache/druid/blob/master/integration-tests/src/main/java/org/apache/druid/testing/utils/AvroEventSerializer.java), and [`AvroSchemaRegistryEventSerializer`](https://github.com/apache/druid/blob/master/integration-tests/src/main/java/org/apache/druid/testing/utils/AvroSchemaRegistryEventSerializer.java) are already present because there are integration tests using the Avro stream parsers ([avro inline](https://github.com/apache/druid/blob/master/integration-tests/src/test/resources/stream/data/avro/parser/input_row_parser.json) and [avro schema registry](https://github.com/apache/druid/blob/master/integration-tests/src/test/resources/stream/data/avro_schema_registry/parser/input_row_parser.json)), they are just missing the input format templates because it didn't exist until this PR (JSON, CSV, and TSV do have input format templates, [which might be useful as a reference](https://github.com/apache/druid/tree/master/integration-tests/src/te
 st/resources/stream/data)). You should just be able to adapt those parser templates into the equivalent input format template.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] bananaaggle edited a comment on pull request #11040: add avro stream input format

Posted by GitBox <gi...@apache.org>.
bananaaggle edited a comment on pull request #11040:
URL: https://github.com/apache/druid/pull/11040#issuecomment-810245683


   > code LGTM 👍
   > 
   > This PR needs docs added to https://github.com/apache/druid/blob/master/docs/ingestion/data-formats.md I think before it is ready to go.
   > 
   > I think it should also be relatively easy to add an integration test for this since we already have an integration test for the `Parser` implementation of Avro + Schema Registry. All that needs done is a new `input_format` directory be created in this location https://github.com/apache/druid/tree/master/integration-tests/src/test/resources/stream/data/avro_schema_registry with a new `input_format.json` template (using the `InputFormat` instead of the `Parser`). See JSON for example: https://github.com/apache/druid/tree/master/integration-tests/src/test/resources/stream/data/json. If this template is added, then _i think_ it should be automatically picked up and run as part of the kafka data format integration tests.
   
   Sorry, I don't understand what you said. Do you mean an integration test is developing for Avro stream and when it finished, I can add a new json about this test? Or I need create this integration test by myself? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on a change in pull request #11040: add avro stream input format

Posted by GitBox <gi...@apache.org>.
clintropolis commented on a change in pull request #11040:
URL: https://github.com/apache/druid/pull/11040#discussion_r603707036



##########
File path: extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.avro;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.impl.NestedInputFormat;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.util.Objects;
+
+public class AvroStreamInputFormat extends NestedInputFormat
+{
+  private final boolean binaryAsString;
+
+  private final AvroBytesDecoder avroBytesDecoder;
+
+  @JsonCreator
+  public AvroStreamInputFormat(
+      @JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec,
+      @JsonProperty("avroBytesDecoder") AvroBytesDecoder avroBytesDecoder,
+      @JsonProperty("binaryAsString") @Nullable Boolean binaryAsString
+  )
+  {
+    super(flattenSpec);
+    this.avroBytesDecoder = avroBytesDecoder;
+    this.binaryAsString = binaryAsString == null ? false : binaryAsString;
+  }
+
+  @Override
+  public boolean isSplittable()
+  {
+    return false;
+  }
+
+  @JsonProperty
+  public AvroBytesDecoder getAvroBytesDecoder()
+  {
+    return avroBytesDecoder;
+  }
+
+  @Override
+  public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
+  {
+    return new AvroStreamReader(
+        inputRowSchema,
+        source,
+        avroBytesDecoder,
+        getFlattenSpec(),
+        binaryAsString
+    );
+  }
+
+  @Override
+  public boolean equals(final Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    final AvroStreamInputFormat that = (AvroStreamInputFormat) o;
+    return Objects.equals(getFlattenSpec(), that.getFlattenSpec()) &&
+        Objects.equals(avroBytesDecoder, that.avroBytesDecoder);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(getFlattenSpec(), avroBytesDecoder);
+  }

Review comment:
       equality/hashcode should probably consider `binaryAsString` for their computations

##########
File path: extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.avro;
+
+import com.google.common.collect.Iterators;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.io.IOUtils;
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.IntermediateRowParsingReader;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.java.util.common.CloseableIterators;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.java.util.common.parsers.ObjectFlattener;
+import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
+import org.apache.druid.java.util.common.parsers.ParseException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class AvroStreamReader extends IntermediateRowParsingReader<GenericRecord>
+{
+  private final InputRowSchema inputRowSchema;
+  private final InputEntity source;
+  private final AvroBytesDecoder avroBytesDecoder;
+  private final ObjectFlattener<GenericRecord> recordFlattener;
+
+  AvroStreamReader(
+      InputRowSchema inputRowSchema,
+      InputEntity source,
+      AvroBytesDecoder avroBytesDecoder,
+      JSONPathSpec flattenSpec,
+      boolean binaryAsString
+  )
+  {
+    this.inputRowSchema = inputRowSchema;
+    this.source = source;
+    this.avroBytesDecoder = avroBytesDecoder;
+    this.recordFlattener = ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(false, binaryAsString));
+  }
+
+  @Override
+  protected CloseableIterator<GenericRecord> intermediateRowIterator() throws IOException
+  {
+    return CloseableIterators.withEmptyBaggage(
+        Iterators.singletonIterator(avroBytesDecoder.parse(ByteBuffer.wrap(IOUtils.toByteArray(source.open())
+        ))));

Review comment:
       nit: strange formatting (occasionally style bot doesn't pick stuff up)
   
   ```suggestion
       return CloseableIterators.withEmptyBaggage(
           Iterators.singletonIterator(avroBytesDecoder.parse(ByteBuffer.wrap(IOUtils.toByteArray(source.open()))))
       );
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on a change in pull request #11040: add avro stream input format

Posted by GitBox <gi...@apache.org>.
clintropolis commented on a change in pull request #11040:
URL: https://github.com/apache/druid/pull/11040#discussion_r604807110



##########
File path: docs/ingestion/data-formats.md
##########
@@ -223,6 +223,51 @@ The Parquet `inputFormat` has the following components:
 |flattenSpec| JSON Object |Define a [`flattenSpec`](#flattenspec) to extract nested values from a Parquet file. Note that only 'path' expression are supported ('jq' is unavailable).| no (default will auto-discover 'root' level properties) |
 | binaryAsString | Boolean | Specifies if the bytes parquet column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) |
 
+### Avro Stream
+
+> You need to include the [`druid-avro-extensions`](../development/extensions-core/avro.md) as an extension to use the Avro Stream input format.
+
+> See the [Avro Types](../development/extensions-core/avro.md#avro-types) section for how Avro types are handled in Druid
+
+The `inputFormat` to load data of Avro format in stream ingestion. An example is:
+```json
+"ioConfig": {
+  "inputFormat": {
+    "type": "avro_stream",
+    "avroBytesDecoder" : {
+      "type" : "schema_repo",
+      "subjectAndIdConverter" : {
+        "type" : "avro_1124",
+        "topic" : "${YOUR_TOPIC}"
+      },
+      "schemaRepository" : {
+        "type" : "avro_1124_rest_client",
+        "url" : "${YOUR_SCHEMA_REPO_END_POINT}",
+      }

Review comment:
       I think we should move https://github.com/apache/druid/blob/master/docs/ingestion/data-formats.md#avro-bytes-decoder (which currently lives with the 'parsers' documentation) up to this 'input formats' section, and have the `parsers` section link to the bytes decoder docs here.
   
   Also I suggest we should switch to using 'inline' or 'schema-registry' as the example instead of 'schema_repo', which isn't used as frequently in practice as far as I know.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on pull request #11040: add avro stream input format

Posted by GitBox <gi...@apache.org>.
clintropolis commented on pull request #11040:
URL: https://github.com/apache/druid/pull/11040#issuecomment-817023099


   I haven't quite determined what is going on yet, but it seems like there is some sort of serialization error that is causing the newly added schema-registry integration test to fail:
   ```
   Caused by: com.fasterxml.jackson.databind.exc.ValueInstantiationException: Cannot construct instance of `org.apache.druid.data.input.avro.SchemaRegistryBasedAvroBytesDecoder`, problem: Expected at least one URL to be passed in constructor
    at [Source: (byte[])&quot;{&quot;type&quot;:&quot;index_kafka&quot;,&quot;id&quot;:&quot;index_kafka_kafka_data_format_indexing_service_test_c940ce28-01a9-4070-9dc0-27df2903249e %?????? ?? ??!?_a0ffca1f01390e3_ejndffbp&quot;,&quot;resource&quot;:{&quot;availabilityGroup&quot;:&quot;index_kafka_kafka_data_format_indexing_service_test_c940ce28-01a9-4070-9dc0-27df2903249e %?????? ?? ??!?_a0ffca1f01390e3&quot;,&quot;requiredCapacity&quot;:1},&quot;dataSchema&quot;:{&quot;dataSource&quot;:&quot;kafka_data_format_indexing_service_test_c940ce28-01a9-4070-9dc0-27df2903249e %?????? ?? ??!?&quot;,&quot;[truncated 4940 bytes]; line: 1, column: 5072] (through reference chain: org.apache.druid.indexing.kafka.KafkaIndexTask[&quot;ioConfig&quot;]-&gt;org.apache.druid.indexing.kafka.KafkaIndexTaskIOConfig[&quot;inputFormat&quot;]-&gt;org.apache.druid.data.input.avro.AvroStreamInputFormat[&quot;avroBytesDecoder&quot;])
   	at com.fasterxml.jackson.databind.exc.ValueInstantiationException.from(ValueInstantiationException.java:47)
   	at com.fasterxml.jackson.databind.DeserializationContext.instantiationException(DeserializationContext.java:1735)
   	at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.wrapAsJsonMappingException(StdValueInstantiator.java:491)
   	at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.rewrapCtorProblem(StdValueInstantiator.java:514)
   	at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.createFromObjectWith(StdValueInstantiator.java:285)
   ```
   https://travis-ci.com/github/apache/druid/jobs/496046398#L9197


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on a change in pull request #11040: add avro stream input format

Posted by GitBox <gi...@apache.org>.
clintropolis commented on a change in pull request #11040:
URL: https://github.com/apache/druid/pull/11040#discussion_r604807988



##########
File path: docs/ingestion/data-formats.md
##########
@@ -223,6 +223,51 @@ The Parquet `inputFormat` has the following components:
 |flattenSpec| JSON Object |Define a [`flattenSpec`](#flattenspec) to extract nested values from a Parquet file. Note that only 'path' expression are supported ('jq' is unavailable).| no (default will auto-discover 'root' level properties) |
 | binaryAsString | Boolean | Specifies if the bytes parquet column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) |
 
+### Avro Stream
+
+> You need to include the [`druid-avro-extensions`](../development/extensions-core/avro.md) as an extension to use the Avro Stream input format.
+
+> See the [Avro Types](../development/extensions-core/avro.md#avro-types) section for how Avro types are handled in Druid
+
+The `inputFormat` to load data of Avro format in stream ingestion. An example is:
+```json
+"ioConfig": {
+  "inputFormat": {
+    "type": "avro_stream",
+    "avroBytesDecoder" : {
+      "type" : "schema_repo",
+      "subjectAndIdConverter" : {
+        "type" : "avro_1124",
+        "topic" : "${YOUR_TOPIC}"
+      },
+      "schemaRepository" : {
+        "type" : "avro_1124_rest_client",
+        "url" : "${YOUR_SCHEMA_REPO_END_POINT}",
+      }
+    },
+    "flattenSpec": {
+      "useFieldDiscovery": true,
+      "fields": [
+        {
+          "type": "path",
+          "name": "someRecord_subInt",
+          "expr": "$.someRecord.subInt"
+        }
+      ]
+    },
+    "binaryAsString": false
+  },
+  ...
+}
+```
+
+| Field | Type | Description | Required |
+|-------|------|-------------|----------|
+|type| String| This should be set to `avro_stream` to read Avro serialized data| yes |
+|flattenSpec| JSON Object |Define a [`flattenSpec`](#flattenspec) to extract nested values from a Avro record. Note that only 'path' expression are supported ('jq' is unavailable).| no (default will auto-discover 'root' level properties) |
+|`avroBytesDecoder`| JSON Object |Specifies how to decode bytes to Avro record. | yes |
+| binaryAsString | Boolean | Specifies if the bytes Avro column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) |
+

Review comment:
       I think we should move https://github.com/apache/druid/blob/master/docs/ingestion/data-formats.md#avro-bytes-decoder (which currently lives with the 'parsers' documentation) up to this 'input formats' section, and have the `parsers` section link to the bytes decoder docs here.

##########
File path: docs/ingestion/data-formats.md
##########
@@ -223,6 +223,51 @@ The Parquet `inputFormat` has the following components:
 |flattenSpec| JSON Object |Define a [`flattenSpec`](#flattenspec) to extract nested values from a Parquet file. Note that only 'path' expression are supported ('jq' is unavailable).| no (default will auto-discover 'root' level properties) |
 | binaryAsString | Boolean | Specifies if the bytes parquet column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) |
 
+### Avro Stream
+
+> You need to include the [`druid-avro-extensions`](../development/extensions-core/avro.md) as an extension to use the Avro Stream input format.
+
+> See the [Avro Types](../development/extensions-core/avro.md#avro-types) section for how Avro types are handled in Druid
+
+The `inputFormat` to load data of Avro format in stream ingestion. An example is:
+```json
+"ioConfig": {
+  "inputFormat": {
+    "type": "avro_stream",
+    "avroBytesDecoder" : {
+      "type" : "schema_repo",
+      "subjectAndIdConverter" : {
+        "type" : "avro_1124",
+        "topic" : "${YOUR_TOPIC}"
+      },
+      "schemaRepository" : {
+        "type" : "avro_1124_rest_client",
+        "url" : "${YOUR_SCHEMA_REPO_END_POINT}",
+      }

Review comment:
       I suggest we should switch to using 'inline' or 'schema-registry' as the example instead of 'schema_repo', which isn't used as frequently in practice as far as I know.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on pull request #11040: add avro stream input format

Posted by GitBox <gi...@apache.org>.
clintropolis commented on pull request #11040:
URL: https://github.com/apache/druid/pull/11040#issuecomment-817029533


   It seems like https://github.com/apache/druid/blob/master/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java#L50 is missing getter methods annotated with `@JsonProperty`, which I suspect is related to the test failure. I'm not really sure how the parser based integration test is passing since it doesn't seem like serializing the schema-registry bytes decoder should work... (looking into this).
   
   Could you add serialization round trip tests for `AvroBytesDecoder` implementations to your unit tests so we can get coverage on this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] bananaaggle commented on pull request #11040: add avro stream input format

Posted by GitBox <gi...@apache.org>.
bananaaggle commented on pull request #11040:
URL: https://github.com/apache/druid/pull/11040#issuecomment-810877103


   @clintropolis Document done. Do I need to create integration test? Is there some examples of it? I'm interesting about it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] bananaaggle commented on pull request #11040: add avro stream input format

Posted by GitBox <gi...@apache.org>.
bananaaggle commented on pull request #11040:
URL: https://github.com/apache/druid/pull/11040#issuecomment-817058404


   > I haven't quite determined what is going on yet, but it seems like there is some sort of serialization error that is causing the newly added schema-registry input format integration test to fail:
   > 
   > ```
   > Caused by: com.fasterxml.jackson.databind.exc.ValueInstantiationException: Cannot construct instance of `org.apache.druid.data.input.avro.SchemaRegistryBasedAvroBytesDecoder`, problem: Expected at least one URL to be passed in constructor
   >  at [Source: (byte[])&quot;{&quot;type&quot;:&quot;index_kafka&quot;,&quot;id&quot;:&quot;index_kafka_kafka_data_format_indexing_service_test_c940ce28-01a9-4070-9dc0-27df2903249e %?????? ?? ??!?_a0ffca1f01390e3_ejndffbp&quot;,&quot;resource&quot;:{&quot;availabilityGroup&quot;:&quot;index_kafka_kafka_data_format_indexing_service_test_c940ce28-01a9-4070-9dc0-27df2903249e %?????? ?? ??!?_a0ffca1f01390e3&quot;,&quot;requiredCapacity&quot;:1},&quot;dataSchema&quot;:{&quot;dataSource&quot;:&quot;kafka_data_format_indexing_service_test_c940ce28-01a9-4070-9dc0-27df2903249e %?????? ?? ??!?&quot;,&quot;[truncated 4940 bytes]; line: 1, column: 5072] (through reference chain: org.apache.druid.indexing.kafka.KafkaIndexTask[&quot;ioConfig&quot;]-&gt;org.apache.druid.indexing.kafka.KafkaIndexTaskIOConfig[&quot;inputFormat&quot;]-&gt;org.apache.druid.data.input.avro.AvroStreamInputFormat[&quot;avroBytesDecoder&quot;])
   > 	at com.fasterxml.jackson.databind.exc.ValueInstantiationException.from(ValueInstantiationException.java:47)
   > 	at com.fasterxml.jackson.databind.DeserializationContext.instantiationException(DeserializationContext.java:1735)
   > 	at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.wrapAsJsonMappingException(StdValueInstantiator.java:491)
   > 	at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.rewrapCtorProblem(StdValueInstantiator.java:514)
   > 	at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.createFromObjectWith(StdValueInstantiator.java:285)
   > ```
   > 
   > https://travis-ci.com/github/apache/druid/jobs/496046398#L9197
   > 
   > The inline schema test is passing 👍
   
   I test it last week and I know something wrong with schema registry decoder. I will fix it this weekend, thanks for your advice and I will change code for this exception.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org