You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2020/03/04 02:35:29 UTC

[GitHub] [hive] davidov541 opened a new pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

davidov541 opened a new pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933
 
 
   Adds support for indicating a number of bytes at the beginning of each message to ignore. This is added in order to support Confluent's Avro message format for Kafka, which has five magic bytes at the beginning of each message. A pre-defined confluent format is given as well, which automatically skips the first five bytes.
   
   This is a resubmission of https://github.com/apache/hive/pull/526, which had been abandoned. Comments in that thread have been applied.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] davidov541 commented on issue #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
davidov541 commented on issue #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#issuecomment-596138315
 
 
   Full system test passed identically to the one above, and I was able to use the name SimpleRecord without any issues. Only thing we are left waiting on is any final comments, and for the build system to finish its checks.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387864743
 
 

 ##########
 File path: kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
 ##########
 @@ -133,12 +134,40 @@
       Preconditions.checkArgument(!schemaFromProperty.isEmpty(), "Avro Schema is empty Can not go further");
       Schema schema = AvroSerdeUtils.getSchemaFor(schemaFromProperty);
       LOG.debug("Building Avro Reader with schema {}", schemaFromProperty);
-      bytesConverter = new AvroBytesConverter(schema);
+      bytesConverter = getByteConverterForAvroDelegate(schema, tbl);
     } else {
       bytesConverter = new BytesWritableConverter();
     }
   }
 
+  enum BytesConverterType {
+    CONFLUENT,
 
 Review comment:
   As long as we document it very well, I'm fine with that. I understand your thought process there, and it makes perfect sense. I'm also worried about developers who use Confluent not realizing that they need to skip bytes, so this is the value for them. It wouldn't be obvious if you are working with Confluent that you actually need to skip some bytes. Perhaps we can add some documentation on the Hive Confluence somewhere obvious stating this (not just as part of the documentation for this particular value).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] cricket007 commented on issue #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
cricket007 commented on issue #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#issuecomment-596033178
 
 
   What do you mean? The Avro file and Maven plugin are part of this PR 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r388077250
 
 

 ##########
 File path: kafka-handler/README.md
 ##########
 @@ -50,6 +50,9 @@ ALTER TABLE
 SET TBLPROPERTIES (
   "kafka.serde.class" = "org.apache.hadoop.hive.serde2.avro.AvroSerDe");
 ```
+
+If you use Confluent Avro serialzier/deserializer with Schema Registry you may want to remove 5 bytes from beginning that represents magic byte + schema ID from registry.
 
 Review comment:
   Sorry if I missed it, but is there a test case for when you do skip the bytes, but there's no url or literal given? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] b-slim commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
b-slim commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387881145
 
 

 ##########
 File path: kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
 ##########
 @@ -133,12 +134,40 @@
       Preconditions.checkArgument(!schemaFromProperty.isEmpty(), "Avro Schema is empty Can not go further");
       Schema schema = AvroSerdeUtils.getSchemaFor(schemaFromProperty);
       LOG.debug("Building Avro Reader with schema {}", schemaFromProperty);
-      bytesConverter = new AvroBytesConverter(schema);
+      bytesConverter = getByteConverterForAvroDelegate(schema, tbl);
     } else {
       bytesConverter = new BytesWritableConverter();
     }
   }
 
+  enum BytesConverterType {
+    CONFLUENT,
+    SKIP,
+    NONE;
+
+    static BytesConverterType fromString(String value) {
+      try {
+        return BytesConverterType.valueOf(value.trim().toUpperCase());
+      } catch (Exception e){
+        return NONE;
+      }
+    }
+  }
+
+  BytesConverter getByteConverterForAvroDelegate(Schema schema, Properties tbl) {
+    String avroBytesConverterProperty = tbl.getProperty(AvroSerdeUtils
+                                                            .AvroTableProperties.AVRO_SERDE_TYPE
+                                                            .getPropName(), BytesConverterType.NONE.toString());
+    BytesConverterType avroByteConverterType = BytesConverterType.fromString(avroBytesConverterProperty);
+    Integer avroSkipBytes = Integer.getInteger(tbl.getProperty(AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_SKIP_BYTES
 
 Review comment:
   @davidov541 please read the java doc of `java.lang.Integer#getInteger(java.lang.String)` In the first line it says
   `* Determines the integer value of the system property with the
        * specified name.`
   Reading the code should help as well ....

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] davidov541 commented on issue #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
davidov541 commented on issue #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#issuecomment-596032664
 
 
   Agreed. It isn't difficult to do. The guidance I've been given in the past is to keep PRs tight and focused on the JIRA. Given that this is something that could be user-facing, it's easy enough to make a JIRA for this and a separate PR. I'll let @b-slim or a committer chime in on which direction we should go here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387931054
 
 

 ##########
 File path: kafka-handler/src/test/org/apache/hadoop/hive/kafka/AvroBytesConverterTest.java
 ##########
 @@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.collect.Maps;
+import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Map;
+
+/**
+ * Test class for Hive Kafka Avro bytes converter.
+ */
+public class AvroBytesConverterTest {
+  private static SimpleRecord simpleRecord1 = SimpleRecord.newBuilder().setId("123").setName("test").build();
+  private static byte[] simpleRecord1AsBytes;
+
+  /**
+   * Emulate confluent avro producer that add 4 magic bits (int) before value bytes. The int represents the schema ID from schema registry.
+   */
+  @BeforeClass
+  public static void setUp() {
+    Map<String, String> config = Maps.newHashMap();
+    config.put("schema.registry.url", "http://localhost");
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] cricket007 commented on issue #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
cricket007 commented on issue #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#issuecomment-596029870
 
 
   > since it will use the one built into the system, instead of theirs. 
   
   Move the Avro schema to src/test/resources and scope the plugin to the test lifecycle, and you shouldn't have that problem 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387931357
 
 

 ##########
 File path: kafka-handler/pom.xml
 ##########
 @@ -118,8 +118,21 @@
       <version>1.7.30</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>io.confluent</groupId>
+      <artifactId>kafka-avro-serializer</artifactId>
+      <version>5.4.0</version>
+      <scope>test</scope>
 
 Review comment:
   Added exclusion.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387887820
 
 

 ##########
 File path: kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
 ##########
 @@ -133,12 +134,40 @@
       Preconditions.checkArgument(!schemaFromProperty.isEmpty(), "Avro Schema is empty Can not go further");
       Schema schema = AvroSerdeUtils.getSchemaFor(schemaFromProperty);
       LOG.debug("Building Avro Reader with schema {}", schemaFromProperty);
-      bytesConverter = new AvroBytesConverter(schema);
+      bytesConverter = getByteConverterForAvroDelegate(schema, tbl);
     } else {
       bytesConverter = new BytesWritableConverter();
     }
   }
 
+  enum BytesConverterType {
+    CONFLUENT,
+    SKIP,
+    NONE;
+
+    static BytesConverterType fromString(String value) {
+      try {
+        return BytesConverterType.valueOf(value.trim().toUpperCase());
+      } catch (Exception e){
+        return NONE;
+      }
+    }
+  }
+
+  BytesConverter getByteConverterForAvroDelegate(Schema schema, Properties tbl) {
+    String avroBytesConverterProperty = tbl.getProperty(AvroSerdeUtils
+                                                            .AvroTableProperties.AVRO_SERDE_TYPE
+                                                            .getPropName(), BytesConverterType.NONE.toString());
+    BytesConverterType avroByteConverterType = BytesConverterType.fromString(avroBytesConverterProperty);
+    Integer avroSkipBytes = Integer.getInteger(tbl.getProperty(AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_SKIP_BYTES
 
 Review comment:
   Thanks @b-slim , I definitely skipped that multiple times when I was reading the code. That is a bizarre issue, but I'm glad you pointed it out. I've fixed it, and am running the tests now.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r388077675
 
 

 ##########
 File path: kafka-handler/README.md
 ##########
 @@ -50,6 +50,9 @@ ALTER TABLE
 SET TBLPROPERTIES (
   "kafka.serde.class" = "org.apache.hadoop.hive.serde2.avro.AvroSerDe");
 ```
+
+If you use Confluent's Avro serialzier or deserializer with the Confluent Schema Registry, you will need to remove five bytes from beginning of each message. These five bytes represent [a magic byte and a four-byte schema ID from registry.](https://docs.confluent.io/current/schema-registry/serializer-formatter.html#wire-format)
 
 Review comment:
   Typo: serializer
   
   Remove five bytes from *the* beginning of each
   
   From *the* Registry 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387868922
 
 

 ##########
 File path: kafka-handler/src/test/org/apache/hadoop/hive/kafka/AvroBytesConverterTest.java
 ##########
 @@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.collect.Maps;
+import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Map;
+
+/**
+ * Test class for Hive Kafka Avro bytes converter.
+ */
+public class AvroBytesConverterTest {
+  private static SimpleRecord simpleRecord1 = SimpleRecord.newBuilder().setId("123").setName("test").build();
+  private static byte[] simpleRecord1AsBytes;
+
+  /**
+   * Emulate confluent avro producer that add 4 magic bits (int) before value bytes. The int represents the schema ID from schema registry.
+   */
+  @BeforeClass
+  public static void setUp() {
+    Map<String, String> config = Maps.newHashMap();
+    config.put("schema.registry.url", "http://localhost");
 
 Review comment:
   Nit: AbstractAvroSerdeConfig has a constant for this property
   
   Also, add :8081 for the port 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387865405
 
 

 ##########
 File path: kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
 ##########
 @@ -133,12 +134,40 @@
       Preconditions.checkArgument(!schemaFromProperty.isEmpty(), "Avro Schema is empty Can not go further");
       Schema schema = AvroSerdeUtils.getSchemaFor(schemaFromProperty);
       LOG.debug("Building Avro Reader with schema {}", schemaFromProperty);
-      bytesConverter = new AvroBytesConverter(schema);
+      bytesConverter = getByteConverterForAvroDelegate(schema, tbl);
     } else {
       bytesConverter = new BytesWritableConverter();
     }
   }
 
+  enum BytesConverterType {
+    CONFLUENT,
 
 Review comment:
   Confluent introducing a backwards incompatible change seems unlikely 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] davidov541 commented on issue #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
davidov541 commented on issue #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#issuecomment-596033740
 
 
   Ah you're right, sorry. There are other uses of Avro in the Hive codebase, and I thought this was one of them, but this was included with the original changeset. I'll fix that now.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r388038766
 
 

 ##########
 File path: kafka-handler/README.md
 ##########
 @@ -50,6 +50,9 @@ ALTER TABLE
 SET TBLPROPERTIES (
   "kafka.serde.class" = "org.apache.hadoop.hive.serde2.avro.AvroSerDe");
 ```
+
+If you use Confluent Avro serialzier/deserializer with Schema Registry you may want to remove 5 bytes from beginning that represents magic byte + schema ID from registry.
 
 Review comment:
   How about this: for now, we do not call this Confluent, and instead document that if you are using Confluent, you need to use skip bytes = 5. Once we implement the feature to use the schema ID properly, then we can use Confluent at that point. That way it is clear what functionality must be in place in order to have a separate SerDe type.
   
   In a related note, I don't see any reference to a JIRA to implement the feature to use the schema ID. Do either of you have that one? If not, should I create that and link it for reference?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r388367705
 
 

 ##########
 File path: kafka-handler/README.md
 ##########
 @@ -50,6 +50,9 @@ ALTER TABLE
 SET TBLPROPERTIES (
   "kafka.serde.class" = "org.apache.hadoop.hive.serde2.avro.AvroSerDe");
 ```
+
+If you use Confluent Avro serialzier/deserializer with Schema Registry you may want to remove 5 bytes from beginning that represents magic byte + schema ID from registry.
 
 Review comment:
   @b-slim : Based on my proposal above, I've removed the confluent serde type. I'm keeping the serde type parameter with the expectation that we'll implement schema registry use in the future, so having that distinction is important for that time.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387995005
 
 

 ##########
 File path: kafka-handler/README.md
 ##########
 @@ -50,6 +50,9 @@ ALTER TABLE
 SET TBLPROPERTIES (
   "kafka.serde.class" = "org.apache.hadoop.hive.serde2.avro.AvroSerDe");
 ```
+
+If you use Confluent Avro serialzier/deserializer with Schema Registry you may want to remove 5 bytes from beginning that represents magic byte + schema ID from registry.
 
 Review comment:
   Links for Context - https://docs.confluent.io/current/schema-registry/serializer-formatter.html#wire-format
   
   However, there are *other* Registries, so I would like to make a point to have this be as extensible as possible. 
   
   - https://github.com/hortonworks/registry
   - https://github.com/Apicurio/apicurio-registry/
   
   [Confluent's branding says `Confluent Schema Regisry`](https://docs.confluent.io/current/schema-registry/index.html), so I think it's self-explanatory enough 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387885875
 
 

 ##########
 File path: kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
 ##########
 @@ -133,12 +134,40 @@
       Preconditions.checkArgument(!schemaFromProperty.isEmpty(), "Avro Schema is empty Can not go further");
       Schema schema = AvroSerdeUtils.getSchemaFor(schemaFromProperty);
       LOG.debug("Building Avro Reader with schema {}", schemaFromProperty);
-      bytesConverter = new AvroBytesConverter(schema);
+      bytesConverter = getByteConverterForAvroDelegate(schema, tbl);
     } else {
       bytesConverter = new BytesWritableConverter();
     }
   }
 
+  enum BytesConverterType {
+    CONFLUENT,
+    SKIP,
+    NONE;
+
+    static BytesConverterType fromString(String value) {
+      try {
+        return BytesConverterType.valueOf(value.trim().toUpperCase());
+      } catch (Exception e){
+        return NONE;
+      }
+    }
+  }
+
+  BytesConverter getByteConverterForAvroDelegate(Schema schema, Properties tbl) {
+    String avroBytesConverterProperty = tbl.getProperty(AvroSerdeUtils
+                                                            .AvroTableProperties.AVRO_SERDE_TYPE
+                                                            .getPropName(), BytesConverterType.NONE.toString());
+    BytesConverterType avroByteConverterType = BytesConverterType.fromString(avroBytesConverterProperty);
+    Integer avroSkipBytes = Integer.getInteger(tbl.getProperty(AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_SKIP_BYTES
 
 Review comment:
   tl;dr - Just use `Integer.parseInt(tbl.getProperty())`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387998210
 
 

 ##########
 File path: kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
 ##########
 @@ -133,12 +134,40 @@
       Preconditions.checkArgument(!schemaFromProperty.isEmpty(), "Avro Schema is empty Can not go further");
       Schema schema = AvroSerdeUtils.getSchemaFor(schemaFromProperty);
       LOG.debug("Building Avro Reader with schema {}", schemaFromProperty);
-      bytesConverter = new AvroBytesConverter(schema);
+      bytesConverter = getByteConverterForAvroDelegate(schema, tbl);
     } else {
       bytesConverter = new BytesWritableConverter();
     }
   }
 
+  enum BytesConverterType {
+    CONFLUENT,
+    SKIP,
+    NONE;
+
+    static BytesConverterType fromString(String value) {
+      try {
+        return BytesConverterType.valueOf(value.trim().toUpperCase());
+      } catch (Exception e){
+        return NONE;
+      }
+    }
+  }
+
+  BytesConverter getByteConverterForAvroDelegate(Schema schema, Properties tbl) {
+    String avroBytesConverterPropertyName = AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_TYPE.getPropName();
+    String avroBytesConverterProperty = tbl.getProperty(avroBytesConverterPropertyName, 
+      BytesConverterType.NONE.toString());
+    BytesConverterType avroByteConverterType = BytesConverterType.fromString(avroBytesConverterProperty);
+    String avroSkipBytesPropertyName = AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_SKIP_BYTES.getPropName();
+    Integer avroSkipBytes = Integer.parseInt(tbl.getProperty(avroSkipBytesPropertyName));
+    switch (avroByteConverterType) {
+      case CONFLUENT: return new AvroSkipBytesConverter(schema, 5);
+      case SKIP: return new AvroSkipBytesConverter(schema, avroSkipBytes);
+      default: return new AvroBytesConverter(schema);
 
 Review comment:
   Could fall-through on the cases. 
   
   ```
   int skipBytes = avroSkipBytes;
   switch (avroByteConverterType) {
         case CONFLUENT: skipBytes = 5;
         case SKIP: return new AvroSkipBytesConverter(schema, avroSkipBytes);
         default: return new AvroBytesConverter(schema);
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387878744
 
 

 ##########
 File path: kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
 ##########
 @@ -133,12 +134,40 @@
       Preconditions.checkArgument(!schemaFromProperty.isEmpty(), "Avro Schema is empty Can not go further");
       Schema schema = AvroSerdeUtils.getSchemaFor(schemaFromProperty);
       LOG.debug("Building Avro Reader with schema {}", schemaFromProperty);
-      bytesConverter = new AvroBytesConverter(schema);
+      bytesConverter = getByteConverterForAvroDelegate(schema, tbl);
     } else {
       bytesConverter = new BytesWritableConverter();
     }
   }
 
+  enum BytesConverterType {
+    CONFLUENT,
+    SKIP,
+    NONE;
+
+    static BytesConverterType fromString(String value) {
+      try {
+        return BytesConverterType.valueOf(value.trim().toUpperCase());
+      } catch (Exception e){
+        return NONE;
+      }
+    }
+  }
+
+  BytesConverter getByteConverterForAvroDelegate(Schema schema, Properties tbl) {
+    String avroBytesConverterProperty = tbl.getProperty(AvroSerdeUtils
+                                                            .AvroTableProperties.AVRO_SERDE_TYPE
+                                                            .getPropName(), BytesConverterType.NONE.toString());
+    BytesConverterType avroByteConverterType = BytesConverterType.fromString(avroBytesConverterProperty);
+    Integer avroSkipBytes = Integer.getInteger(tbl.getProperty(AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_SKIP_BYTES
 
 Review comment:
   I think I'm confused what you're asking for then. The initialize function takes in a java.util.Properties object that has properties that have been set for the serde in the DDL for the table. It reads a few from that object, and then passes it to getByteConverterForAvroDelegate, where it is also used in the code added here. The usage of the properties object here matches what is being done in initialize, and seems to match what I would expect. These aren't pulling system properties of the JVM, or at least are not necessarily doing so, instead reading from the Properties object passed to us.
   
   Does that make sense, or am I way off base?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] davidov541 commented on issue #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
davidov541 commented on issue #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#issuecomment-595988456
 
 
   OK, I was able to successfully test this build using a Confluent single-node cluster and a Hive pseudo-standalone cluster. I was able to create a topic with a simple Avro schema and a few records, and then read that from Hive successfully.
   
   Confluent Cluster Production:
   ![image](https://user-images.githubusercontent.com/656337/76126740-8e5ee080-5fc5-11ea-8f6a-ed3d594e2547.png)
   
   Hive Table Creation and Querying:
   ![image](https://user-images.githubusercontent.com/656337/76126707-7ab37a00-5fc5-11ea-974b-3d25fafd1f2e.png)
   
   One thing I noticed was that on the Hive side, if I used the exact same schema as the SimpleRecord schema which we use for testing, I got the following error.
   
   ```
   2020-03-06T22:05:23,739  WARN [HiveServer2-Handler-Pool: Thread-165] thrift.ThriftCLIService: Error fetching results:
   org.apache.hive.service.cli.HiveSQLException: java.io.IOException: java.lang.ClassCastException: org.apache.avro.util.Utf8 cannot be cast to java.lang.String
           at org.apache.hive.service.cli.operation.SQLOperation.getNextRowSet(SQLOperation.java:481) ~[hive-service-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hive.service.cli.operation.OperationManager.getOperationNextRowSet(OperationManager.java:331) ~[hive-service-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hive.service.cli.session.HiveSessionImpl.fetchResults(HiveSessionImpl.java:946) ~[hive-service-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hive.service.cli.CLIService.fetchResults(CLIService.java:567) ~[hive-service-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hive.service.cli.thrift.ThriftCLIService.FetchResults(ThriftCLIService.java:801) ~[hive-service-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hive.service.rpc.thrift.TCLIService$Processor$FetchResults.getResult(TCLIService.java:1837) ~[hive-exec-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hive.service.rpc.thrift.TCLIService$Processor$FetchResults.getResult(TCLIService.java:1822) ~[hive-exec-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39) ~[hive-exec-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39) ~[hive-exec-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:56) ~[hive-service-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286) ~[hive-exec-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_242]
           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_242]
           at java.lang.Thread.run(Thread.java:748) [?:1.8.0_242]
   Caused by: java.io.IOException: java.lang.ClassCastException: org.apache.avro.util.Utf8 cannot be cast to java.lang.String
           at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextRow(FetchOperator.java:638) ~[hive-exec-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hadoop.hive.ql.exec.FetchOperator.pushRow(FetchOperator.java:545) ~[hive-exec-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hadoop.hive.ql.exec.FetchTask.fetch(FetchTask.java:150) ~[hive-exec-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hadoop.hive.ql.Driver.getResults(Driver.java:880) ~[hive-exec-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hadoop.hive.ql.reexec.ReExecDriver.getResults(ReExecDriver.java:241) ~[hive-exec-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hive.service.cli.operation.SQLOperation.getNextRowSet(SQLOperation.java:476) ~[hive-service-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           ... 13 more
   Caused by: java.lang.ClassCastException: org.apache.avro.util.Utf8 cannot be cast to java.lang.String
           at org.apache.hadoop.hive.kafka.SimpleRecord.put(SimpleRecord.java:88) ~[kafka-handler-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.avro.generic.GenericData.setField(GenericData.java:690) ~[avro-1.8.2.jar:1.8.2]
           at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119) ~[avro-1.8.2.jar:1.8.2]
           at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) ~[avro-1.8.2.jar:1.8.2]
           at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) ~[avro-1.8.2.jar:1.8.2]
           at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) ~[avro-1.8.2.jar:1.8.2]
           at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) ~[avro-1.8.2.jar:1.8.2]
           at org.apache.hadoop.hive.kafka.KafkaSerDe$AvroBytesConverter.getWritable(KafkaSerDe.java:401) ~[kafka-handler-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hadoop.hive.kafka.KafkaSerDe$AvroBytesConverter.getWritable(KafkaSerDe.java:367) ~[kafka-handler-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hadoop.hive.kafka.KafkaSerDe.deserializeKWritable(KafkaSerDe.java:250) ~[kafka-handler-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hadoop.hive.kafka.KafkaSerDe.deserialize(KafkaSerDe.java:238) ~[kafka-handler-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextRow(FetchOperator.java:619) ~[hive-exec-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hadoop.hive.ql.exec.FetchOperator.pushRow(FetchOperator.java:545) ~[hive-exec-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hadoop.hive.ql.exec.FetchTask.fetch(FetchTask.java:150) ~[hive-exec-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hadoop.hive.ql.Driver.getResults(Driver.java:880) ~[hive-exec-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hadoop.hive.ql.reexec.ReExecDriver.getResults(ReExecDriver.java:241) ~[hive-exec-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hive.service.cli.operation.SQLOperation.getNextRowSet(SQLOperation.java:476) ~[hive-service-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           ... 13 more
   ```
   
   It concerns me that an Avro schema we use for testing is being included in a final build, and also that using it in the Hive table gives this error. I think this is likely a separate issue, but I wanted to pass it by you (@b-slim) first before filing a separate JIRA and ignoring it for now.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r388078819
 
 

 ##########
 File path: kafka-handler/src/test/org/apache/hadoop/hive/kafka/AvroBytesConverterTest.java
 ##########
 @@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.collect.Maps;
+import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Test class for Hive Kafka Avro SerDe with variable bytes skipped.
+ */
+public class AvroBytesConverterTest {
+  private static SimpleRecord simpleRecord = SimpleRecord.newBuilder().setId("123").setName("test").build();
+  private static byte[] simpleRecordConfluentBytes;
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  /**
+   * Use the KafkaAvroSerializer from Confluent to serialize the simpleRecord. 
+   */
+  @BeforeClass
+  public static void setUp() {
+    Map<String, String> config = Maps.newHashMap();
+    config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
+    KafkaAvroSerializer avroSerializer = new KafkaAvroSerializer(new MockSchemaRegistryClient());
+    avroSerializer.configure(config, false);
+    simpleRecordConfluentBytes = avroSerializer.serialize("temp", simpleRecord);
+  }
+
+  private void runConversionTest(KafkaSerDe.AvroBytesConverter conv, byte[] serializedSimpleRecord) { 
 
 Review comment:
   Pass Integer in as a parameter, and you could move more code in here.
   
   For the case where there's no offset, pass in null 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r388000371
 
 

 ##########
 File path: kafka-handler/pom.xml
 ##########
 @@ -118,8 +118,27 @@
       <version>1.7.30</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>io.confluent</groupId>
+      <artifactId>kafka-avro-serializer</artifactId>
+      <version>5.4.0</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.avro</groupId>
+          <artifactId>avro</artifactId>
 
 Review comment:
   Avro itself is still needed as a compile-time  & runtime dependency elsewhere. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387999601
 
 

 ##########
 File path: kafka-handler/src/test/org/apache/hadoop/hive/kafka/AvroBytesConverterTest.java
 ##########
 @@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.collect.Maps;
+import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Map;
+
+/**
+ * Test class for Hive Kafka Avro bytes converter.
+ */
+public class AvroBytesConverterTest {
+  private static SimpleRecord simpleRecord1 = SimpleRecord.newBuilder().setId("123").setName("test").build();
+  private static byte[] simpleRecord1AsBytes;
+
+  /**
+   * Emulate confluent avro producer that add 4 magic bits (int) before value bytes. The int represents the schema ID from schema registry.
+   */
+  @BeforeClass
+  public static void setUp() {
+    Map<String, String> config = Maps.newHashMap();
+    config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
+    KafkaAvroSerializer avroSerializer = new KafkaAvroSerializer(new MockSchemaRegistryClient());
+    avroSerializer.configure(config, false);
+    simpleRecord1AsBytes = avroSerializer.serialize("temp", simpleRecord1);
+  }
+
+  /**
+   * Emulate - avro.serde.type = none (Default).
+   */
+  @Test
+  public void convertWithAvroBytesConverter() {
+    Schema schema = SimpleRecord.getClassSchema();
+    KafkaSerDe.AvroBytesConverter conv = new KafkaSerDe.AvroBytesConverter(schema);
+    AvroGenericRecordWritable simpleRecord1Writable = conv.getWritable(simpleRecord1AsBytes);
+
+    Assert.assertNotNull(simpleRecord1Writable);
+    Assert.assertEquals(SimpleRecord.class, simpleRecord1Writable.getRecord().getClass());
+
+    SimpleRecord simpleRecord1Deserialized = (SimpleRecord) simpleRecord1Writable.getRecord();
+
+    Assert.assertNotNull(simpleRecord1Deserialized);
+    Assert.assertNotEquals(simpleRecord1, simpleRecord1Deserialized);
+  }
+
+  /**
+   * Emulate - avro.serde.type = confluent.
+   */
+  @Test
+  public void convertWithConfluentAvroBytesConverter() {
+    Schema schema = SimpleRecord.getClassSchema();
+    KafkaSerDe.AvroSkipBytesConverter conv = new KafkaSerDe.AvroSkipBytesConverter(schema, 5);
+    AvroGenericRecordWritable simpleRecord1Writable = conv.getWritable(simpleRecord1AsBytes);
+
+    Assert.assertNotNull(simpleRecord1Writable);
+    Assert.assertEquals(SimpleRecord.class, simpleRecord1Writable.getRecord().getClass());
+
+    SimpleRecord simpleRecord1Deserialized = (SimpleRecord) simpleRecord1Writable.getRecord();
+
+    Assert.assertNotNull(simpleRecord1Deserialized);
+    Assert.assertEquals(simpleRecord1, simpleRecord1Deserialized);
+  }
+
+  /**
+   * Emulate - avro.serde.type = skip.
+   */
+  @Test
+  public void convertWithCustomAvroSkipBytesConverter() {
+    int offset = 2;
+    byte[] simpleRecordAsOffsetBytes = Arrays.copyOfRange(simpleRecord1AsBytes, 5 - offset, simpleRecord1AsBytes.length);
+
+    Schema schema = SimpleRecord.getClassSchema();
+    KafkaSerDe.AvroSkipBytesConverter conv = new KafkaSerDe.AvroSkipBytesConverter(schema, offset);
+    AvroGenericRecordWritable simpleRecord1Writable = conv.getWritable(simpleRecordAsOffsetBytes);
+
+    Assert.assertNotNull(simpleRecord1Writable);
+    Assert.assertEquals(SimpleRecord.class, simpleRecord1Writable.getRecord().getClass());
+
+    SimpleRecord simpleRecord1Deserialized = (SimpleRecord) simpleRecord1Writable.getRecord();
+
+    Assert.assertNotNull(simpleRecord1Deserialized);
+    Assert.assertEquals(simpleRecord1, simpleRecord1Deserialized);
 
 Review comment:
   This looks like repeated code from the methods above. Refactor it? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] davidov541 commented on issue #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
davidov541 commented on issue #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#issuecomment-596135777
 
 
   OK, I've verified that SimpleRecord is no longer in the generated JAR, and that tests run appropriately. I'm currently building a distribution to test in my system again, at which point I'll be confident this PR is correct and ready. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387998553
 
 

 ##########
 File path: kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
 ##########
 @@ -369,6 +402,26 @@ private SubStructObjectInspector(StructObjectInspector baseOI, int toIndex) {
     }
   }
 
+    /**
+     * The converter reads bytes from kafka message and skip first @skipBytes from beginning.
+     *
+     * For example:
+     *       The Confluent Avro serializer adds 5 magic bytes that represents Schema ID as Integer to the message.
+     */
+  static class AvroSkipBytesConverter extends AvroBytesConverter {
+    private final int skipBytes;
+
+    AvroSkipBytesConverter(Schema schema, int skipBytes) {
+      super(schema);
+      this.skipBytes = skipBytes;
+    }
+
+    @Override
+    Decoder getDecoder(byte[] value) {
+      return DecoderFactory.get().binaryDecoder(value, this.skipBytes, value.length - this.skipBytes, null);
 
 Review comment:
   Should there be validation that `skipBytes.length < value.length`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r388540929
 
 

 ##########
 File path: kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
 ##########
 @@ -133,12 +134,44 @@
       Preconditions.checkArgument(!schemaFromProperty.isEmpty(), "Avro Schema is empty Can not go further");
       Schema schema = AvroSerdeUtils.getSchemaFor(schemaFromProperty);
       LOG.debug("Building Avro Reader with schema {}", schemaFromProperty);
-      bytesConverter = new AvroBytesConverter(schema);
+      bytesConverter = getByteConverterForAvroDelegate(schema, tbl);
     } else {
       bytesConverter = new BytesWritableConverter();
     }
   }
 
+  enum BytesConverterType {
+    SKIP,
+    NONE;
+
+    static BytesConverterType fromString(String value) {
+      try {
+        return BytesConverterType.valueOf(value.trim().toUpperCase());
+      } catch (Exception e){
+        return NONE;
+      }
+    }
+  }
+
+  BytesConverter getByteConverterForAvroDelegate(Schema schema, Properties tbl) throws SerDeException {
+    String avroBytesConverterPropertyName = AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_TYPE.getPropName();
+    String avroBytesConverterProperty = tbl.getProperty(avroBytesConverterPropertyName, 
+      BytesConverterType.NONE.toString());
+    BytesConverterType avroByteConverterType = BytesConverterType.fromString(avroBytesConverterProperty);
+    String avroSkipBytesPropertyName = AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_SKIP_BYTES.getPropName();
+    Integer avroSkipBytes = 0;
+    try {
+      Integer.parseInt(tbl.getProperty(avroSkipBytesPropertyName));
 
 Review comment:
   Dangit, you're right. I'll fix this and get a test for this too, since we should be catching these sorts of things in tests. I've got an old build around here of a Hive test cluster. I'll see if I can bring that up and give it a try.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387999324
 
 

 ##########
 File path: kafka-handler/src/test/org/apache/hadoop/hive/kafka/AvroBytesConverterTest.java
 ##########
 @@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.collect.Maps;
+import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Map;
+
+/**
+ * Test class for Hive Kafka Avro bytes converter.
+ */
+public class AvroBytesConverterTest {
+  private static SimpleRecord simpleRecord1 = SimpleRecord.newBuilder().setId("123").setName("test").build();
+  private static byte[] simpleRecord1AsBytes;
+
+  /**
+   * Emulate confluent avro producer that add 4 magic bits (int) before value bytes. The int represents the schema ID from schema registry.
+   */
+  @BeforeClass
+  public static void setUp() {
+    Map<String, String> config = Maps.newHashMap();
+    config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
+    KafkaAvroSerializer avroSerializer = new KafkaAvroSerializer(new MockSchemaRegistryClient());
+    avroSerializer.configure(config, false);
+    simpleRecord1AsBytes = avroSerializer.serialize("temp", simpleRecord1);
+  }
+
+  /**
+   * Emulate - avro.serde.type = none (Default).
+   */
+  @Test
+  public void convertWithAvroBytesConverter() {
+    Schema schema = SimpleRecord.getClassSchema();
+    KafkaSerDe.AvroBytesConverter conv = new KafkaSerDe.AvroBytesConverter(schema);
+    AvroGenericRecordWritable simpleRecord1Writable = conv.getWritable(simpleRecord1AsBytes);
+
+    Assert.assertNotNull(simpleRecord1Writable);
+    Assert.assertEquals(SimpleRecord.class, simpleRecord1Writable.getRecord().getClass());
+
+    SimpleRecord simpleRecord1Deserialized = (SimpleRecord) simpleRecord1Writable.getRecord();
+
+    Assert.assertNotNull(simpleRecord1Deserialized);
+    Assert.assertNotEquals(simpleRecord1, simpleRecord1Deserialized);
+  }
+
+  /**
+   * Emulate - avro.serde.type = confluent.
+   */
+  @Test
+  public void convertWithConfluentAvroBytesConverter() {
+    Schema schema = SimpleRecord.getClassSchema();
+    KafkaSerDe.AvroSkipBytesConverter conv = new KafkaSerDe.AvroSkipBytesConverter(schema, 5);
+    AvroGenericRecordWritable simpleRecord1Writable = conv.getWritable(simpleRecord1AsBytes);
+
+    Assert.assertNotNull(simpleRecord1Writable);
+    Assert.assertEquals(SimpleRecord.class, simpleRecord1Writable.getRecord().getClass());
+
+    SimpleRecord simpleRecord1Deserialized = (SimpleRecord) simpleRecord1Writable.getRecord();
+
+    Assert.assertNotNull(simpleRecord1Deserialized);
+    Assert.assertEquals(simpleRecord1, simpleRecord1Deserialized);
 
 Review comment:
   What does the `1` represent here?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387931187
 
 

 ##########
 File path: kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
 ##########
 @@ -369,6 +402,26 @@ private SubStructObjectInspector(StructObjectInspector baseOI, int toIndex) {
     }
   }
 
+    /**
+     * The converter reads bytes from kafka message and skip first @skipBytes from beginning.
+     *
+     * For example:
+     *       Confluent kafka producer add 5 magic bytes that represents Schema ID as Integer to the message.
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] b-slim commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
b-slim commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387860862
 
 

 ##########
 File path: kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
 ##########
 @@ -133,12 +134,40 @@
       Preconditions.checkArgument(!schemaFromProperty.isEmpty(), "Avro Schema is empty Can not go further");
       Schema schema = AvroSerdeUtils.getSchemaFor(schemaFromProperty);
       LOG.debug("Building Avro Reader with schema {}", schemaFromProperty);
-      bytesConverter = new AvroBytesConverter(schema);
+      bytesConverter = getByteConverterForAvroDelegate(schema, tbl);
     } else {
       bytesConverter = new BytesWritableConverter();
     }
   }
 
+  enum BytesConverterType {
+    CONFLUENT,
+    SKIP,
+    NONE;
+
+    static BytesConverterType fromString(String value) {
+      try {
+        return BytesConverterType.valueOf(value.trim().toUpperCase());
+      } catch (Exception e){
+        return NONE;
+      }
+    }
+  }
+
+  BytesConverter getByteConverterForAvroDelegate(Schema schema, Properties tbl) {
+    String avroBytesConverterProperty = tbl.getProperty(AvroSerdeUtils
+                                                            .AvroTableProperties.AVRO_SERDE_TYPE
+                                                            .getPropName(), BytesConverterType.NONE.toString());
+    BytesConverterType avroByteConverterType = BytesConverterType.fromString(avroBytesConverterProperty);
+    Integer avroSkipBytes = Integer.getInteger(tbl.getProperty(AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_SKIP_BYTES
 
 Review comment:
   not sure how to read this ? looking at the implementation of `Integer.getInteger` i see it is reading form the system properties, this is clearly not going to work. Aha... Java naming 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r388037563
 
 

 ##########
 File path: kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
 ##########
 @@ -133,12 +134,40 @@
       Preconditions.checkArgument(!schemaFromProperty.isEmpty(), "Avro Schema is empty Can not go further");
       Schema schema = AvroSerdeUtils.getSchemaFor(schemaFromProperty);
       LOG.debug("Building Avro Reader with schema {}", schemaFromProperty);
-      bytesConverter = new AvroBytesConverter(schema);
+      bytesConverter = getByteConverterForAvroDelegate(schema, tbl);
     } else {
       bytesConverter = new BytesWritableConverter();
     }
   }
 
+  enum BytesConverterType {
+    CONFLUENT,
+    SKIP,
+    NONE;
+
+    static BytesConverterType fromString(String value) {
+      try {
+        return BytesConverterType.valueOf(value.trim().toUpperCase());
+      } catch (Exception e){
+        return NONE;
+      }
+    }
+  }
+
+  BytesConverter getByteConverterForAvroDelegate(Schema schema, Properties tbl) {
+    String avroBytesConverterPropertyName = AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_TYPE.getPropName();
+    String avroBytesConverterProperty = tbl.getProperty(avroBytesConverterPropertyName, 
+      BytesConverterType.NONE.toString());
+    BytesConverterType avroByteConverterType = BytesConverterType.fromString(avroBytesConverterProperty);
+    String avroSkipBytesPropertyName = AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_SKIP_BYTES.getPropName();
+    Integer avroSkipBytes = Integer.parseInt(tbl.getProperty(avroSkipBytesPropertyName));
+    switch (avroByteConverterType) {
+      case CONFLUENT: return new AvroSkipBytesConverter(schema, 5);
+      case SKIP: return new AvroSkipBytesConverter(schema, avroSkipBytes);
+      default: return new AvroBytesConverter(schema);
 
 Review comment:
   This would be more confusing to me than the current code, personally. I will call out the NONE case, however, along with an error if it's not one of those three.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r388082372
 
 

 ##########
 File path: kafka-handler/README.md
 ##########
 @@ -50,6 +50,9 @@ ALTER TABLE
 SET TBLPROPERTIES (
   "kafka.serde.class" = "org.apache.hadoop.hive.serde2.avro.AvroSerDe");
 ```
+
+If you use Confluent Avro serialzier/deserializer with Schema Registry you may want to remove 5 bytes from beginning that represents magic byte + schema ID from registry.
 
 Review comment:
   I'm not sure how that is germane to this thread or to this changelist at all. Skipping the bytes does not currently have any connection to having a schema, which is already implemented as part of the Kafka Avro support. The system would work the same as it would if you didn't specify a schema in the first place. I don't see any tests in the other parts which test this, although there are plenty of query tests which test the use of the literal and URL properties, so I suspect there is one such test there. Given that that is orthagonal to this problem, however, I see no need to add another test to this changeset for that.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387997559
 
 

 ##########
 File path: kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
 ##########
 @@ -133,12 +134,40 @@
       Preconditions.checkArgument(!schemaFromProperty.isEmpty(), "Avro Schema is empty Can not go further");
       Schema schema = AvroSerdeUtils.getSchemaFor(schemaFromProperty);
       LOG.debug("Building Avro Reader with schema {}", schemaFromProperty);
-      bytesConverter = new AvroBytesConverter(schema);
+      bytesConverter = getByteConverterForAvroDelegate(schema, tbl);
     } else {
       bytesConverter = new BytesWritableConverter();
     }
   }
 
+  enum BytesConverterType {
+    CONFLUENT,
 
 Review comment:
   Overall, I'm somewhat in agreement with @b-slim here. There is little reason to make a specific "subtype" if it is just documented that `avro.skip.bytes=5` will get the necessary Avro payload. 
   
   **However**, you would not know _which_ of those 5 bytes actually represents the schema ID in order to set `schema.literal` behind the scenes.  

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387931635
 
 

 ##########
 File path: kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
 ##########
 @@ -133,12 +134,40 @@
       Preconditions.checkArgument(!schemaFromProperty.isEmpty(), "Avro Schema is empty Can not go further");
       Schema schema = AvroSerdeUtils.getSchemaFor(schemaFromProperty);
       LOG.debug("Building Avro Reader with schema {}", schemaFromProperty);
-      bytesConverter = new AvroBytesConverter(schema);
+      bytesConverter = getByteConverterForAvroDelegate(schema, tbl);
     } else {
       bytesConverter = new BytesWritableConverter();
     }
   }
 
+  enum BytesConverterType {
+    CONFLUENT,
+    SKIP,
+    NONE;
+
+    static BytesConverterType fromString(String value) {
+      try {
+        return BytesConverterType.valueOf(value.trim().toUpperCase());
+      } catch (Exception e){
+        return NONE;
+      }
+    }
+  }
+
+  BytesConverter getByteConverterForAvroDelegate(Schema schema, Properties tbl) {
+    String avroBytesConverterProperty = tbl.getProperty(AvroSerdeUtils
+                                                            .AvroTableProperties.AVRO_SERDE_TYPE
+                                                            .getPropName(), BytesConverterType.NONE.toString());
+    BytesConverterType avroByteConverterType = BytesConverterType.fromString(avroBytesConverterProperty);
+    Integer avroSkipBytes = Integer.getInteger(tbl.getProperty(AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_SKIP_BYTES
 
 Review comment:
   OK, made the fix. Please check it to make sure I got it right this time.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r388037768
 
 

 ##########
 File path: kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
 ##########
 @@ -369,6 +402,26 @@ private SubStructObjectInspector(StructObjectInspector baseOI, int toIndex) {
     }
   }
 
+    /**
+     * The converter reads bytes from kafka message and skip first @skipBytes from beginning.
+     *
+     * For example:
+     *       The Confluent Avro serializer adds 5 magic bytes that represents Schema ID as Integer to the message.
+     */
+  static class AvroSkipBytesConverter extends AvroBytesConverter {
+    private final int skipBytes;
+
+    AvroSkipBytesConverter(Schema schema, int skipBytes) {
+      super(schema);
+      this.skipBytes = skipBytes;
+    }
+
+    @Override
+    Decoder getDecoder(byte[] value) {
+      return DecoderFactory.get().binaryDecoder(value, this.skipBytes, value.length - this.skipBytes, null);
 
 Review comment:
   BinaryDecoder already throws a nice ArrayIndexOutOfBoundsException in this case, so I'm going to update to catch that, wrap in a SerDe exception, and keep going.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r388079696
 
 

 ##########
 File path: kafka-handler/src/test/org/apache/hadoop/hive/kafka/AvroBytesConverterTest.java
 ##########
 @@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.collect.Maps;
+import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Test class for Hive Kafka Avro SerDe with variable bytes skipped.
+ */
+public class AvroBytesConverterTest {
+  private static SimpleRecord simpleRecord = SimpleRecord.newBuilder().setId("123").setName("test").build();
+  private static byte[] simpleRecordConfluentBytes;
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  /**
+   * Use the KafkaAvroSerializer from Confluent to serialize the simpleRecord. 
+   */
+  @BeforeClass
+  public static void setUp() {
+    Map<String, String> config = Maps.newHashMap();
+    config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
+    KafkaAvroSerializer avroSerializer = new KafkaAvroSerializer(new MockSchemaRegistryClient());
+    avroSerializer.configure(config, false);
+    simpleRecordConfluentBytes = avroSerializer.serialize("temp", simpleRecord);
+  }
+
+  private void runConversionTest(KafkaSerDe.AvroBytesConverter conv, byte[] serializedSimpleRecord) { 
 
 Review comment:
   We are instantiating KafkaSerDe.AvroBytesConverter for the normal case and KafkaSerDe.AvroSkipBytesConverter for the other cases. If we wanted to move creation of the converter into the function, it would require an if-else statement, making it more complex than just doing this. Additionally, this gives us the leeway to better add tests if more converters are needed later.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] davidov541 edited a comment on issue #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
davidov541 edited a comment on issue #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#issuecomment-595988456
 
 
   OK, I was able to successfully test this build using a Confluent single-node cluster and a Hive pseudo-standalone cluster. I was able to create a topic with a simple Avro schema and a few records, and then read that from Hive successfully.
   
   Confluent Cluster Production:
   ![image](https://user-images.githubusercontent.com/656337/76126740-8e5ee080-5fc5-11ea-8f6a-ed3d594e2547.png)
   
   Hive Table Creation and Querying:
   ![image](https://user-images.githubusercontent.com/656337/76126707-7ab37a00-5fc5-11ea-974b-3d25fafd1f2e.png)
   
   One thing I noticed was that on the Hive side, if I used the exact same schema as the SimpleRecord schema which we use for testing, I got the following error. As you can see in the screenshots, I was able to edit the field and schema names, and avoid this error, so it was specifically due to Hive pulling in the SimpleRecord class which we use for testing.
   
   ```
   2020-03-06T22:05:23,739  WARN [HiveServer2-Handler-Pool: Thread-165] thrift.ThriftCLIService: Error fetching results:
   org.apache.hive.service.cli.HiveSQLException: java.io.IOException: java.lang.ClassCastException: org.apache.avro.util.Utf8 cannot be cast to java.lang.String
           at org.apache.hive.service.cli.operation.SQLOperation.getNextRowSet(SQLOperation.java:481) ~[hive-service-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hive.service.cli.operation.OperationManager.getOperationNextRowSet(OperationManager.java:331) ~[hive-service-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hive.service.cli.session.HiveSessionImpl.fetchResults(HiveSessionImpl.java:946) ~[hive-service-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hive.service.cli.CLIService.fetchResults(CLIService.java:567) ~[hive-service-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hive.service.cli.thrift.ThriftCLIService.FetchResults(ThriftCLIService.java:801) ~[hive-service-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hive.service.rpc.thrift.TCLIService$Processor$FetchResults.getResult(TCLIService.java:1837) ~[hive-exec-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hive.service.rpc.thrift.TCLIService$Processor$FetchResults.getResult(TCLIService.java:1822) ~[hive-exec-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39) ~[hive-exec-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39) ~[hive-exec-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:56) ~[hive-service-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286) ~[hive-exec-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_242]
           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_242]
           at java.lang.Thread.run(Thread.java:748) [?:1.8.0_242]
   Caused by: java.io.IOException: java.lang.ClassCastException: org.apache.avro.util.Utf8 cannot be cast to java.lang.String
           at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextRow(FetchOperator.java:638) ~[hive-exec-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hadoop.hive.ql.exec.FetchOperator.pushRow(FetchOperator.java:545) ~[hive-exec-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hadoop.hive.ql.exec.FetchTask.fetch(FetchTask.java:150) ~[hive-exec-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hadoop.hive.ql.Driver.getResults(Driver.java:880) ~[hive-exec-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hadoop.hive.ql.reexec.ReExecDriver.getResults(ReExecDriver.java:241) ~[hive-exec-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hive.service.cli.operation.SQLOperation.getNextRowSet(SQLOperation.java:476) ~[hive-service-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           ... 13 more
   Caused by: java.lang.ClassCastException: org.apache.avro.util.Utf8 cannot be cast to java.lang.String
           at org.apache.hadoop.hive.kafka.SimpleRecord.put(SimpleRecord.java:88) ~[kafka-handler-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.avro.generic.GenericData.setField(GenericData.java:690) ~[avro-1.8.2.jar:1.8.2]
           at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119) ~[avro-1.8.2.jar:1.8.2]
           at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) ~[avro-1.8.2.jar:1.8.2]
           at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) ~[avro-1.8.2.jar:1.8.2]
           at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) ~[avro-1.8.2.jar:1.8.2]
           at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) ~[avro-1.8.2.jar:1.8.2]
           at org.apache.hadoop.hive.kafka.KafkaSerDe$AvroBytesConverter.getWritable(KafkaSerDe.java:401) ~[kafka-handler-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hadoop.hive.kafka.KafkaSerDe$AvroBytesConverter.getWritable(KafkaSerDe.java:367) ~[kafka-handler-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hadoop.hive.kafka.KafkaSerDe.deserializeKWritable(KafkaSerDe.java:250) ~[kafka-handler-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hadoop.hive.kafka.KafkaSerDe.deserialize(KafkaSerDe.java:238) ~[kafka-handler-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextRow(FetchOperator.java:619) ~[hive-exec-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hadoop.hive.ql.exec.FetchOperator.pushRow(FetchOperator.java:545) ~[hive-exec-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hadoop.hive.ql.exec.FetchTask.fetch(FetchTask.java:150) ~[hive-exec-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hadoop.hive.ql.Driver.getResults(Driver.java:880) ~[hive-exec-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hadoop.hive.ql.reexec.ReExecDriver.getResults(ReExecDriver.java:241) ~[hive-exec-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           at org.apache.hive.service.cli.operation.SQLOperation.getNextRowSet(SQLOperation.java:476) ~[hive-service-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
           ... 13 more
   ```
   
   It concerns me that an Avro schema we use for testing is being included in a final build, and also that using it in the Hive table gives this error. I think this is likely a separate issue, but I wanted to pass it by you (@b-slim) first before filing a separate JIRA and ignoring it for now.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387866510
 
 

 ##########
 File path: kafka-handler/pom.xml
 ##########
 @@ -190,5 +207,27 @@
         </executions>
       </plugin>
     </plugins>
+    <pluginManagement>
+      <plugins>
+        <plugin>
+          <groupId>org.apache.avro</groupId>
+          <artifactId>avro-maven-plugin</artifactId>
+          <version>1.8.1</version>
 
 Review comment:
   Is the Avro version stored in properties anywhere else? Confluent uses Avro 1.9.x now 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387869095
 
 

 ##########
 File path: kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
 ##########
 @@ -133,12 +134,40 @@
       Preconditions.checkArgument(!schemaFromProperty.isEmpty(), "Avro Schema is empty Can not go further");
       Schema schema = AvroSerdeUtils.getSchemaFor(schemaFromProperty);
       LOG.debug("Building Avro Reader with schema {}", schemaFromProperty);
-      bytesConverter = new AvroBytesConverter(schema);
+      bytesConverter = getByteConverterForAvroDelegate(schema, tbl);
     } else {
       bytesConverter = new BytesWritableConverter();
     }
   }
 
+  enum BytesConverterType {
+    CONFLUENT,
+    SKIP,
+    NONE;
+
+    static BytesConverterType fromString(String value) {
+      try {
+        return BytesConverterType.valueOf(value.trim().toUpperCase());
+      } catch (Exception e){
+        return NONE;
+      }
+    }
+  }
+
+  BytesConverter getByteConverterForAvroDelegate(Schema schema, Properties tbl) {
+    String avroBytesConverterProperty = tbl.getProperty(AvroSerdeUtils
+                                                            .AvroTableProperties.AVRO_SERDE_TYPE
+                                                            .getPropName(), BytesConverterType.NONE.toString());
+    BytesConverterType avroByteConverterType = BytesConverterType.fromString(avroBytesConverterProperty);
+    Integer avroSkipBytes = Integer.getInteger(tbl.getProperty(AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_SKIP_BYTES
 
 Review comment:
   Fixed. Please check that the new code is more readable and resolve this issue.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] b-slim commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
b-slim commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r388535436
 
 

 ##########
 File path: kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
 ##########
 @@ -133,12 +134,44 @@
       Preconditions.checkArgument(!schemaFromProperty.isEmpty(), "Avro Schema is empty Can not go further");
       Schema schema = AvroSerdeUtils.getSchemaFor(schemaFromProperty);
       LOG.debug("Building Avro Reader with schema {}", schemaFromProperty);
-      bytesConverter = new AvroBytesConverter(schema);
+      bytesConverter = getByteConverterForAvroDelegate(schema, tbl);
     } else {
       bytesConverter = new BytesWritableConverter();
     }
   }
 
+  enum BytesConverterType {
+    SKIP,
+    NONE;
+
+    static BytesConverterType fromString(String value) {
+      try {
+        return BytesConverterType.valueOf(value.trim().toUpperCase());
+      } catch (Exception e){
+        return NONE;
+      }
+    }
+  }
+
+  BytesConverter getByteConverterForAvroDelegate(Schema schema, Properties tbl) throws SerDeException {
+    String avroBytesConverterPropertyName = AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_TYPE.getPropName();
+    String avroBytesConverterProperty = tbl.getProperty(avroBytesConverterPropertyName, 
+      BytesConverterType.NONE.toString());
+    BytesConverterType avroByteConverterType = BytesConverterType.fromString(avroBytesConverterProperty);
+    String avroSkipBytesPropertyName = AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_SKIP_BYTES.getPropName();
+    Integer avroSkipBytes = 0;
+    try {
+      Integer.parseInt(tbl.getProperty(avroSkipBytesPropertyName));
 
 Review comment:
   Seems to me that this broken. The parsed value is never used... did you try this code on actual machines ?
   Can you please run this code against actual confluent based avro records ? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] b-slim commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
b-slim commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387858010
 
 

 ##########
 File path: kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
 ##########
 @@ -133,12 +134,40 @@
       Preconditions.checkArgument(!schemaFromProperty.isEmpty(), "Avro Schema is empty Can not go further");
       Schema schema = AvroSerdeUtils.getSchemaFor(schemaFromProperty);
       LOG.debug("Building Avro Reader with schema {}", schemaFromProperty);
-      bytesConverter = new AvroBytesConverter(schema);
+      bytesConverter = getByteConverterForAvroDelegate(schema, tbl);
     } else {
       bytesConverter = new BytesWritableConverter();
     }
   }
 
+  enum BytesConverterType {
+    CONFLUENT,
 
 Review comment:
   Hi not sure, why we need this ? let say confluent changes the number of bytes to 4, then will have to add a new enum field ? I think adding skip bytes and document the confluent use case is better, What you think ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387868036
 
 

 ##########
 File path: kafka-handler/README.md
 ##########
 @@ -50,6 +50,9 @@ ALTER TABLE
 SET TBLPROPERTIES (
   "kafka.serde.class" = "org.apache.hadoop.hive.serde2.avro.AvroSerDe");
 ```
+
+If you use Confluent Avro serialzier/deserializer with schema registry you may want to remove 5 bytes from beginning that represents magic byte + schema ID from registry.
 
 Review comment:
   Nit: `Schema Registry` is a product/noun. Should be capitalized. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] b-slim commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
b-slim commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387973124
 
 

 ##########
 File path: kafka-handler/README.md
 ##########
 @@ -50,6 +50,9 @@ ALTER TABLE
 SET TBLPROPERTIES (
   "kafka.serde.class" = "org.apache.hadoop.hive.serde2.avro.AvroSerDe");
 ```
+
+If you use Confluent Avro serialzier/deserializer with Schema Registry you may want to remove 5 bytes from beginning that represents magic byte + schema ID from registry.
 
 Review comment:
   @davidov541 please help me understand this, the first 5 bytes on the record am guessing it is a schema id that can be used to fetch schema from the registry ?
   **If that is correct then I think we should not call this confluent and let me explain why.**
   Imagine in the near/far future someone implement a full fledge Avro reader that uses those five bytes to figure out schema from the schema registry what shall we call this now ? Confluent V2 ? that is why I think it is confusing. Thus in my opinion it is better to call it skip, and document how this can be used to read data from Confluent Avro SerDe. Please let me know if this makes sense to you.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387866680
 
 

 ##########
 File path: kafka-handler/pom.xml
 ##########
 @@ -118,8 +118,21 @@
       <version>1.7.30</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>io.confluent</groupId>
+      <artifactId>kafka-avro-serializer</artifactId>
+      <version>5.4.0</version>
+      <scope>test</scope>
 
 Review comment:
   Should you exclude Avro 1.9.x here? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387999851
 
 

 ##########
 File path: kafka-handler/src/test/org/apache/hadoop/hive/kafka/AvroBytesConverterTest.java
 ##########
 @@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.collect.Maps;
+import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Map;
+
+/**
+ * Test class for Hive Kafka Avro bytes converter.
+ */
+public class AvroBytesConverterTest {
+  private static SimpleRecord simpleRecord1 = SimpleRecord.newBuilder().setId("123").setName("test").build();
+  private static byte[] simpleRecord1AsBytes;
+
+  /**
+   * Emulate confluent avro producer that add 4 magic bits (int) before value bytes. The int represents the schema ID from schema registry.
+   */
+  @BeforeClass
+  public static void setUp() {
+    Map<String, String> config = Maps.newHashMap();
+    config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
+    KafkaAvroSerializer avroSerializer = new KafkaAvroSerializer(new MockSchemaRegistryClient());
+    avroSerializer.configure(config, false);
+    simpleRecord1AsBytes = avroSerializer.serialize("temp", simpleRecord1);
+  }
+
+  /**
+   * Emulate - avro.serde.type = none (Default).
+   */
+  @Test
+  public void convertWithAvroBytesConverter() {
+    Schema schema = SimpleRecord.getClassSchema();
+    KafkaSerDe.AvroBytesConverter conv = new KafkaSerDe.AvroBytesConverter(schema);
+    AvroGenericRecordWritable simpleRecord1Writable = conv.getWritable(simpleRecord1AsBytes);
+
+    Assert.assertNotNull(simpleRecord1Writable);
+    Assert.assertEquals(SimpleRecord.class, simpleRecord1Writable.getRecord().getClass());
+
+    SimpleRecord simpleRecord1Deserialized = (SimpleRecord) simpleRecord1Writable.getRecord();
+
+    Assert.assertNotNull(simpleRecord1Deserialized);
+    Assert.assertNotEquals(simpleRecord1, simpleRecord1Deserialized);
+  }
+
+  /**
+   * Emulate - avro.serde.type = confluent.
+   */
+  @Test
+  public void convertWithConfluentAvroBytesConverter() {
+    Schema schema = SimpleRecord.getClassSchema();
+    KafkaSerDe.AvroSkipBytesConverter conv = new KafkaSerDe.AvroSkipBytesConverter(schema, 5);
+    AvroGenericRecordWritable simpleRecord1Writable = conv.getWritable(simpleRecord1AsBytes);
+
+    Assert.assertNotNull(simpleRecord1Writable);
+    Assert.assertEquals(SimpleRecord.class, simpleRecord1Writable.getRecord().getClass());
+
+    SimpleRecord simpleRecord1Deserialized = (SimpleRecord) simpleRecord1Writable.getRecord();
+
+    Assert.assertNotNull(simpleRecord1Deserialized);
+    Assert.assertEquals(simpleRecord1, simpleRecord1Deserialized);
+  }
+
+  /**
+   * Emulate - avro.serde.type = skip.
 
 Review comment:
   You have three methods that roughly do the same thing. 
   
   Might be a good use case for https://github.com/junit-team/junit4/wiki/Parameterized-tests

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] b-slim commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
b-slim commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387888277
 
 

 ##########
 File path: kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
 ##########
 @@ -133,12 +134,40 @@
       Preconditions.checkArgument(!schemaFromProperty.isEmpty(), "Avro Schema is empty Can not go further");
       Schema schema = AvroSerdeUtils.getSchemaFor(schemaFromProperty);
       LOG.debug("Building Avro Reader with schema {}", schemaFromProperty);
-      bytesConverter = new AvroBytesConverter(schema);
+      bytesConverter = getByteConverterForAvroDelegate(schema, tbl);
     } else {
       bytesConverter = new BytesWritableConverter();
     }
   }
 
+  enum BytesConverterType {
+    CONFLUENT,
+    SKIP,
+    NONE;
+
+    static BytesConverterType fromString(String value) {
+      try {
+        return BytesConverterType.valueOf(value.trim().toUpperCase());
+      } catch (Exception e){
+        return NONE;
+      }
+    }
+  }
+
+  BytesConverter getByteConverterForAvroDelegate(Schema schema, Properties tbl) {
+    String avroBytesConverterProperty = tbl.getProperty(AvroSerdeUtils
+                                                            .AvroTableProperties.AVRO_SERDE_TYPE
+                                                            .getPropName(), BytesConverterType.NONE.toString());
+    BytesConverterType avroByteConverterType = BytesConverterType.fromString(avroBytesConverterProperty);
+    Integer avroSkipBytes = Integer.getInteger(tbl.getProperty(AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_SKIP_BYTES
 
 Review comment:
   FYI the goal of code review is not to tell devs what to do, but to help them understand why the code need to be changed ....

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387931106
 
 

 ##########
 File path: kafka-handler/README.md
 ##########
 @@ -50,6 +50,9 @@ ALTER TABLE
 SET TBLPROPERTIES (
   "kafka.serde.class" = "org.apache.hadoop.hive.serde2.avro.AvroSerDe");
 ```
+
+If you use Confluent Avro serialzier/deserializer with schema registry you may want to remove 5 bytes from beginning that represents magic byte + schema ID from registry.
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] davidov541 commented on issue #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
davidov541 commented on issue #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#issuecomment-596573043
 
 
   @b-slim Pre-commit has finished successfully, with only 2 license warnings that are unrelated to this changeset, a mvn install failure due to there being changes in multiple components only, and an unrelated test failure. I believe that this changeset is now ready to commit, once I get your official +1 on it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387995318
 
 

 ##########
 File path: kafka-handler/README.md
 ##########
 @@ -50,6 +50,9 @@ ALTER TABLE
 SET TBLPROPERTIES (
   "kafka.serde.class" = "org.apache.hadoop.hive.serde2.avro.AvroSerDe");
 ```
+
+If you use Confluent Avro serialzier/deserializer with Schema Registry you may want to remove 5 bytes from beginning that represents magic byte + schema ID from registry.
 
 Review comment:
   ```suggestion
   If you use Confluent's Avro serializer/deserializer with their Schema Registry, you may want to remove the first 5 bytes from the beginning of the Kafka field, which represent a magic byte (0x0) & a numeric schema integer ID.
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387980822
 
 

 ##########
 File path: kafka-handler/README.md
 ##########
 @@ -50,6 +50,9 @@ ALTER TABLE
 SET TBLPROPERTIES (
   "kafka.serde.class" = "org.apache.hadoop.hive.serde2.avro.AvroSerDe");
 ```
+
+If you use Confluent Avro serialzier/deserializer with Schema Registry you may want to remove 5 bytes from beginning that represents magic byte + schema ID from registry.
 
 Review comment:
   @b-slim You appear to be correct, based on the source code: https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerializer.java. Let's say we did implement as is, and later we implement the schema registry lookup and use the same identifier? Who would that break? Serialized messages that point to a bogus schema registry instance, or serialized messages that happened to need 5 bytes at the front of the message, but aren't from confluent, and some clever dev figured out he could use Confluent instead of the right way? 
   
   The second case doesn't matter to me tbh. 
   
   The first case is concerning and should be handled. I would expect that we would catch when we can't find a schema and print out a warning, but no error. That would allow this case to continue working. But we would be making assumptions on the implementation of a feature in the future, which is always a crapshoot...
   
   To be clear, if we make sure documentation is clear on this outside of just these parameters, and @cricket007 agrees with it as a heavy Confluent user, then I'm fine with it. It feels like we've covered this problem enough to be in a good spot either way.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387998774
 
 

 ##########
 File path: kafka-handler/src/test/org/apache/hadoop/hive/kafka/AvroBytesConverterTest.java
 ##########
 @@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.collect.Maps;
+import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Map;
+
+/**
+ * Test class for Hive Kafka Avro bytes converter.
+ */
+public class AvroBytesConverterTest {
+  private static SimpleRecord simpleRecord1 = SimpleRecord.newBuilder().setId("123").setName("test").build();
+  private static byte[] simpleRecord1AsBytes;
+
+  /**
+   * Emulate confluent avro producer that add 4 magic bits (int) before value bytes. The int represents the schema ID from schema registry.
 
 Review comment:
   `Schema Registry` is a noun/product

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387996074
 
 

 ##########
 File path: kafka-handler/README.md
 ##########
 @@ -50,6 +50,9 @@ ALTER TABLE
 SET TBLPROPERTIES (
   "kafka.serde.class" = "org.apache.hadoop.hive.serde2.avro.AvroSerDe");
 ```
+
+If you use Confluent Avro serialzier/deserializer with Schema Registry you may want to remove 5 bytes from beginning that represents magic byte + schema ID from registry.
+It can be done by setting `"avro.serde.type"="confluent"` or `"avro.serde.type"="skip"` with `"avro.serde.skip.bytes"="5"`. It's recommended to set an avro schema via `"avro.schema.url"="http://hostname/SimpleDocument.avsc"` or `"avro.schema.literal"="{"type" : "record","name" : "SimpleRecord","..."}`. If both properties are set then `avro.schema.literal` has higher priority.
 
 Review comment:
   ```suggestion
   It can be done by setting `"avro.serde.type"="confluent"` or `"avro.serde.type"="skip"` with `"avro.serde.skip.bytes"="5"`. It's recommended to set an Avro schema via `"avro.schema.url"="http://hostname/SimpleDocument.avsc"` or `"avro.schema.literal"="{"type" : "record","name" : "SimpleRecord","..."}`. If both properties are set then `avro.schema.literal` has higher priority.
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] b-slim commented on issue #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
b-slim commented on issue #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#issuecomment-594901087
 
 
   @davidov541 thanks for the PR i left one important comment that need some work thanks for the contribution. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r388080591
 
 

 ##########
 File path: kafka-handler/src/test/org/apache/hadoop/hive/kafka/AvroBytesConverterTest.java
 ##########
 @@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.collect.Maps;
+import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Test class for Hive Kafka Avro SerDe with variable bytes skipped.
+ */
+public class AvroBytesConverterTest {
+  private static SimpleRecord simpleRecord = SimpleRecord.newBuilder().setId("123").setName("test").build();
+  private static byte[] simpleRecordConfluentBytes;
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  /**
+   * Use the KafkaAvroSerializer from Confluent to serialize the simpleRecord. 
+   */
+  @BeforeClass
+  public static void setUp() {
+    Map<String, String> config = Maps.newHashMap();
+    config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
+    KafkaAvroSerializer avroSerializer = new KafkaAvroSerializer(new MockSchemaRegistryClient());
+    avroSerializer.configure(config, false);
+    simpleRecordConfluentBytes = avroSerializer.serialize("temp", simpleRecord);
+  }
+
+  private void runConversionTest(KafkaSerDe.AvroBytesConverter conv, byte[] serializedSimpleRecord) { 
+    AvroGenericRecordWritable simpleRecordWritable = conv.getWritable(serializedSimpleRecord);
+
+    Assert.assertNotNull(simpleRecordWritable);
+    Assert.assertEquals(SimpleRecord.class, simpleRecordWritable.getRecord().getClass());
+
+    SimpleRecord simpleRecordDeserialized = (SimpleRecord) simpleRecordWritable.getRecord();
+
+    Assert.assertNotNull(simpleRecordDeserialized);
+    Assert.assertEquals(simpleRecord, simpleRecordDeserialized);
+  }
+
+  /**
+   * Tests the default case of no skipped bytes per record works properly. 
+   */
+  @Test
+  public void convertWithAvroBytesConverter() {
+    // Since the serialized version was created by Confluent, lets remove the first five bytes to get the actual message.
+    byte[] simpleRecordWithNoOffset = Arrays.copyOfRange(simpleRecordConfluentBytes, 5, simpleRecordConfluentBytes.length);
+
+    Schema schema = SimpleRecord.getClassSchema();
+    KafkaSerDe.AvroBytesConverter conv = new KafkaSerDe.AvroBytesConverter(schema);
+    runConversionTest(conv, simpleRecordWithNoOffset);
+  }
+
+  /**
+   * Tests that the skip converter skips 5 bytes properly, which matches what Confluent needs.
+   */
+  @Test
+  public void convertWithConfluentAvroBytesConverter() {
+    Schema schema = SimpleRecord.getClassSchema();
+    KafkaSerDe.AvroSkipBytesConverter conv = new KafkaSerDe.AvroSkipBytesConverter(schema, 5);
+    runConversionTest(conv, simpleRecordConfluentBytes);
+  }
+
+  /**
+   * Tests that the skip converter skips a custom number of bytes properly.
+   */
+  @Test
+  public void convertWithCustomAvroSkipBytesConverter() {
+    int offset = 2;
+    // Remove all but two bytes of the five byte offset which Confluent adds, 
+    // to simulate a message with only 2 bytes in front of each message.
+    byte[] simpleRecordAsOffsetBytes = Arrays.copyOfRange(simpleRecordConfluentBytes, 5 - offset, simpleRecordConfluentBytes.length);
+
+    Schema schema = SimpleRecord.getClassSchema();
+    KafkaSerDe.AvroSkipBytesConverter conv = new KafkaSerDe.AvroSkipBytesConverter(schema, offset);
+    runConversionTest(conv, simpleRecordAsOffsetBytes);    
+  }
+
+  /**
+   * Test that when we skip more bytes than are in the message, we throw an exception properly.
+   */
+  @Test
+  public void skipBytesLargerThanMessageSizeConverter() {
+    // The simple record we are serializing is two strings, that combine to be 7 characters or 14 bytes.
+    // Adding in the 5 byte offset, we get 19 bytes. To make sure we go bigger than that, we are setting
+    // the offset to ten times that value. 
+    int offset = 190;
+
+    Schema schema = SimpleRecord.getClassSchema();
+    KafkaSerDe.AvroSkipBytesConverter conv = new KafkaSerDe.AvroSkipBytesConverter(schema, offset);
+
+    exception.expect(RuntimeException.class);
+    exception.expectMessage("org.apache.hadoop.hive.serde2.SerDeException: " + 
+      "Skip bytes value is larger than the message length.");
+    runConversionTest(conv, simpleRecordConfluentBytes);    
+  }
+
+  /**
+  * Test that we properly parse the converter type, no matter the casing.
+  */
+  @Test
+  public void bytesConverterTypeParseTest() {
+    Map<String, KafkaSerDe.BytesConverterType> testCases = new HashMap<String, KafkaSerDe.BytesConverterType>() {{
 
 Review comment:
   The double brace is actually the way this has to be written prior to Java 9: https://stackoverflow.com/questions/6802483/how-to-directly-initialize-a-hashmap-in-a-literal-way. Note that thread talks about some memory issues, but since this is being used for a small map in a test and not in production, it seems fine.
   
   Also, you're correct, we could use a parameterized test. I don't see any reason to use such a heavy mechanism for such a simple test, however. There's no need to do multiple tests per set of parameters, nor are there more than two parameters necessary. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] b-slim commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
b-slim commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387861610
 
 

 ##########
 File path: kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
 ##########
 @@ -133,12 +134,40 @@
       Preconditions.checkArgument(!schemaFromProperty.isEmpty(), "Avro Schema is empty Can not go further");
       Schema schema = AvroSerdeUtils.getSchemaFor(schemaFromProperty);
       LOG.debug("Building Avro Reader with schema {}", schemaFromProperty);
-      bytesConverter = new AvroBytesConverter(schema);
+      bytesConverter = getByteConverterForAvroDelegate(schema, tbl);
     } else {
       bytesConverter = new BytesWritableConverter();
     }
   }
 
+  enum BytesConverterType {
+    CONFLUENT,
+    SKIP,
+    NONE;
+
+    static BytesConverterType fromString(String value) {
+      try {
+        return BytesConverterType.valueOf(value.trim().toUpperCase());
+      } catch (Exception e){
+        return NONE;
+      }
+    }
+  }
+
+  BytesConverter getByteConverterForAvroDelegate(Schema schema, Properties tbl) {
+    String avroBytesConverterProperty = tbl.getProperty(AvroSerdeUtils
+                                                            .AvroTableProperties.AVRO_SERDE_TYPE
+                                                            .getPropName(), BytesConverterType.NONE.toString());
+    BytesConverterType avroByteConverterType = BytesConverterType.fromString(avroBytesConverterProperty);
+    Integer avroSkipBytes = Integer.getInteger(tbl.getProperty(AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_SKIP_BYTES
 
 Review comment:
   ` public static Integer getInteger(String nm, Integer val) {
           String v = null;
           try {
               v = System.getProperty(nm);
           } catch (IllegalArgumentException | NullPointerException e) {
           }
           if (v != null) {
               try {
                   return Integer.decode(v);
               } catch (NumberFormatException e) {
               }
           }
           return val;
       }`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387998699
 
 

 ##########
 File path: kafka-handler/src/test/org/apache/hadoop/hive/kafka/AvroBytesConverterTest.java
 ##########
 @@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.collect.Maps;
+import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Map;
+
+/**
+ * Test class for Hive Kafka Avro bytes converter.
+ */
+public class AvroBytesConverterTest {
+  private static SimpleRecord simpleRecord1 = SimpleRecord.newBuilder().setId("123").setName("test").build();
+  private static byte[] simpleRecord1AsBytes;
+
+  /**
+   * Emulate confluent avro producer that add 4 magic bits (int) before value bytes. The int represents the schema ID from schema registry.
 
 Review comment:
   nit: `Confluent` is a noun/company

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387931454
 
 

 ##########
 File path: kafka-handler/pom.xml
 ##########
 @@ -190,5 +207,27 @@
         </executions>
       </plugin>
     </plugins>
+    <pluginManagement>
+      <plugins>
+        <plugin>
+          <groupId>org.apache.avro</groupId>
+          <artifactId>avro-maven-plugin</artifactId>
+          <version>1.8.1</version>
 
 Review comment:
   Used the global avro.version variable.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] davidov541 commented on issue #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
davidov541 commented on issue #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#issuecomment-594289581
 
 
   @cricket007 @b-slim : Please take a look and let me know what you think.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387995005
 
 

 ##########
 File path: kafka-handler/README.md
 ##########
 @@ -50,6 +50,9 @@ ALTER TABLE
 SET TBLPROPERTIES (
   "kafka.serde.class" = "org.apache.hadoop.hive.serde2.avro.AvroSerDe");
 ```
+
+If you use Confluent Avro serialzier/deserializer with Schema Registry you may want to remove 5 bytes from beginning that represents magic byte + schema ID from registry.
 
 Review comment:
   Links for Context - https://docs.confluent.io/current/schema-registry/serializer-formatter.html#wire-format
   
   However, there are *other* Registries, so I would like to make a point to make this as extensible as possible. 
   
   - https://github.com/hortonworks/registry
   - https://github.com/Apicurio/apicurio-registry/
   
   [Confluent's branding says `Confluent Schema Regisry`](https://docs.confluent.io/current/schema-registry/index.html), so I think it's self-explanatory enough 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387949896
 
 

 ##########
 File path: kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
 ##########
 @@ -133,12 +134,40 @@
       Preconditions.checkArgument(!schemaFromProperty.isEmpty(), "Avro Schema is empty Can not go further");
       Schema schema = AvroSerdeUtils.getSchemaFor(schemaFromProperty);
       LOG.debug("Building Avro Reader with schema {}", schemaFromProperty);
-      bytesConverter = new AvroBytesConverter(schema);
+      bytesConverter = getByteConverterForAvroDelegate(schema, tbl);
     } else {
       bytesConverter = new BytesWritableConverter();
     }
   }
 
+  enum BytesConverterType {
+    CONFLUENT,
 
 Review comment:
   That makes sense, @cricket007. So far Confluent has done a good job preventing backward compatibility issues, so I don't see a reason to assume that they will do so in the future at the cost of a worse developer experience. I'd be fine leaving it as is.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] cricket007 commented on issue #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
cricket007 commented on issue #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#issuecomment-596019430
 
 
   I think that is a side effect of the Avro Maven Plugin on that configuration block... You can put `Charsequence` or `String`, I think, but the default is `Utf8`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] davidov541 edited a comment on issue #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
davidov541 edited a comment on issue #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#issuecomment-596573043
 
 
   @b-slim Pre-commit has finished successfully, with only 2 license warnings that are unrelated to this changeset, a mvn install failure due to there being changes in multiple components only, and an unrelated test failure. I believe that this changeset is now ready to commit, once I get your official +1 on it.
   
   https://builds.apache.org/job/PreCommit-HIVE-Build/21029/

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] cricket007 commented on issue #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
cricket007 commented on issue #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#issuecomment-595026999
 
 
   You may want to rebase? I see your commit for 14888 in here 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] b-slim commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
b-slim commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387873547
 
 

 ##########
 File path: kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
 ##########
 @@ -133,12 +134,40 @@
       Preconditions.checkArgument(!schemaFromProperty.isEmpty(), "Avro Schema is empty Can not go further");
       Schema schema = AvroSerdeUtils.getSchemaFor(schemaFromProperty);
       LOG.debug("Building Avro Reader with schema {}", schemaFromProperty);
-      bytesConverter = new AvroBytesConverter(schema);
+      bytesConverter = getByteConverterForAvroDelegate(schema, tbl);
     } else {
       bytesConverter = new BytesWritableConverter();
     }
   }
 
+  enum BytesConverterType {
+    CONFLUENT,
+    SKIP,
+    NONE;
+
+    static BytesConverterType fromString(String value) {
+      try {
+        return BytesConverterType.valueOf(value.trim().toUpperCase());
+      } catch (Exception e){
+        return NONE;
+      }
+    }
+  }
+
+  BytesConverter getByteConverterForAvroDelegate(Schema schema, Properties tbl) {
+    String avroBytesConverterProperty = tbl.getProperty(AvroSerdeUtils
+                                                            .AvroTableProperties.AVRO_SERDE_TYPE
+                                                            .getPropName(), BytesConverterType.NONE.toString());
+    BytesConverterType avroByteConverterType = BytesConverterType.fromString(avroBytesConverterProperty);
+    Integer avroSkipBytes = Integer.getInteger(tbl.getProperty(AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_SKIP_BYTES
 
 Review comment:
   @davidov541 did you read the comment above ? That function you are using is reading from the system property of the JVM please see the function implementation `System.getProperty(nm)`. 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] davidov541 commented on issue #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
davidov541 commented on issue #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#issuecomment-596028873
 
 
   @cricket007 That makes sense. I'm still concerned that the schema is present at all in the build. I'm currently rebuilding some things due to mistakes on my end, but I expect that this issue prevents anyone from using an Avro schema with the name "SimpleRecord" at all, since it will use the one built into the system, instead of theirs. I'll test it, but I do think that's something we can solve separately, since the schema management piece is separate from the skipping of bytes that this PR implements.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
davidov541 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387866087
 
 

 ##########
 File path: kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
 ##########
 @@ -133,12 +134,40 @@
       Preconditions.checkArgument(!schemaFromProperty.isEmpty(), "Avro Schema is empty Can not go further");
       Schema schema = AvroSerdeUtils.getSchemaFor(schemaFromProperty);
       LOG.debug("Building Avro Reader with schema {}", schemaFromProperty);
-      bytesConverter = new AvroBytesConverter(schema);
+      bytesConverter = getByteConverterForAvroDelegate(schema, tbl);
     } else {
       bytesConverter = new BytesWritableConverter();
     }
   }
 
+  enum BytesConverterType {
+    CONFLUENT,
+    SKIP,
+    NONE;
+
+    static BytesConverterType fromString(String value) {
+      try {
+        return BytesConverterType.valueOf(value.trim().toUpperCase());
+      } catch (Exception e){
+        return NONE;
+      }
+    }
+  }
+
+  BytesConverter getByteConverterForAvroDelegate(Schema schema, Properties tbl) {
+    String avroBytesConverterProperty = tbl.getProperty(AvroSerdeUtils
+                                                            .AvroTableProperties.AVRO_SERDE_TYPE
+                                                            .getPropName(), BytesConverterType.NONE.toString());
+    BytesConverterType avroByteConverterType = BytesConverterType.fromString(avroBytesConverterProperty);
+    Integer avroSkipBytes = Integer.getInteger(tbl.getProperty(AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_SKIP_BYTES
 
 Review comment:
   Yeah, we are basically getting the property name for the number of bytes to read, then getting the string value, then converting that to an integer. I think it might be easier to read with an intermediate variable. I'll add that.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r388001426
 
 

 ##########
 File path: kafka-handler/README.md
 ##########
 @@ -50,6 +50,9 @@ ALTER TABLE
 SET TBLPROPERTIES (
   "kafka.serde.class" = "org.apache.hadoop.hive.serde2.avro.AvroSerDe");
 ```
+
+If you use Confluent Avro serialzier/deserializer with Schema Registry you may want to remove 5 bytes from beginning that represents magic byte + schema ID from registry.
+It can be done by setting `"avro.serde.type"="confluent"` or `"avro.serde.type"="skip"` with `"avro.serde.skip.bytes"="5"`. It's recommended to set an avro schema via `"avro.schema.url"="http://hostname/SimpleDocument.avsc"` or `"avro.schema.literal"="{"type" : "record","name" : "SimpleRecord","..."}`. If both properties are set then `avro.schema.literal` has higher priority.
 
 Review comment:
   It is still recommended to set the literal if using `confluent`?
   
   Link https://github.com/confluentinc/schema-registry/pull/686

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r388078182
 
 

 ##########
 File path: kafka-handler/src/test/org/apache/hadoop/hive/kafka/AvroBytesConverterTest.java
 ##########
 @@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.collect.Maps;
+import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Test class for Hive Kafka Avro SerDe with variable bytes skipped.
+ */
+public class AvroBytesConverterTest {
+  private static SimpleRecord simpleRecord = SimpleRecord.newBuilder().setId("123").setName("test").build();
+  private static byte[] simpleRecordConfluentBytes;
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  /**
+   * Use the KafkaAvroSerializer from Confluent to serialize the simpleRecord. 
+   */
+  @BeforeClass
+  public static void setUp() {
+    Map<String, String> config = Maps.newHashMap();
+    config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
+    KafkaAvroSerializer avroSerializer = new KafkaAvroSerializer(new MockSchemaRegistryClient());
+    avroSerializer.configure(config, false);
+    simpleRecordConfluentBytes = avroSerializer.serialize("temp", simpleRecord);
+  }
+
+  private void runConversionTest(KafkaSerDe.AvroBytesConverter conv, byte[] serializedSimpleRecord) { 
+    AvroGenericRecordWritable simpleRecordWritable = conv.getWritable(serializedSimpleRecord);
+
+    Assert.assertNotNull(simpleRecordWritable);
+    Assert.assertEquals(SimpleRecord.class, simpleRecordWritable.getRecord().getClass());
+
+    SimpleRecord simpleRecordDeserialized = (SimpleRecord) simpleRecordWritable.getRecord();
+
+    Assert.assertNotNull(simpleRecordDeserialized);
+    Assert.assertEquals(simpleRecord, simpleRecordDeserialized);
+  }
+
+  /**
+   * Tests the default case of no skipped bytes per record works properly. 
+   */
+  @Test
+  public void convertWithAvroBytesConverter() {
+    // Since the serialized version was created by Confluent, lets remove the first five bytes to get the actual message.
+    byte[] simpleRecordWithNoOffset = Arrays.copyOfRange(simpleRecordConfluentBytes, 5, simpleRecordConfluentBytes.length);
+
+    Schema schema = SimpleRecord.getClassSchema();
+    KafkaSerDe.AvroBytesConverter conv = new KafkaSerDe.AvroBytesConverter(schema);
+    runConversionTest(conv, simpleRecordWithNoOffset);
+  }
+
+  /**
+   * Tests that the skip converter skips 5 bytes properly, which matches what Confluent needs.
+   */
+  @Test
+  public void convertWithConfluentAvroBytesConverter() {
+    Schema schema = SimpleRecord.getClassSchema();
+    KafkaSerDe.AvroSkipBytesConverter conv = new KafkaSerDe.AvroSkipBytesConverter(schema, 5);
+    runConversionTest(conv, simpleRecordConfluentBytes);
+  }
+
+  /**
+   * Tests that the skip converter skips a custom number of bytes properly.
+   */
+  @Test
+  public void convertWithCustomAvroSkipBytesConverter() {
+    int offset = 2;
+    // Remove all but two bytes of the five byte offset which Confluent adds, 
+    // to simulate a message with only 2 bytes in front of each message.
+    byte[] simpleRecordAsOffsetBytes = Arrays.copyOfRange(simpleRecordConfluentBytes, 5 - offset, simpleRecordConfluentBytes.length);
+
+    Schema schema = SimpleRecord.getClassSchema();
+    KafkaSerDe.AvroSkipBytesConverter conv = new KafkaSerDe.AvroSkipBytesConverter(schema, offset);
+    runConversionTest(conv, simpleRecordAsOffsetBytes);    
+  }
+
+  /**
+   * Test that when we skip more bytes than are in the message, we throw an exception properly.
+   */
+  @Test
+  public void skipBytesLargerThanMessageSizeConverter() {
+    // The simple record we are serializing is two strings, that combine to be 7 characters or 14 bytes.
+    // Adding in the 5 byte offset, we get 19 bytes. To make sure we go bigger than that, we are setting
+    // the offset to ten times that value. 
+    int offset = 190;
+
+    Schema schema = SimpleRecord.getClassSchema();
+    KafkaSerDe.AvroSkipBytesConverter conv = new KafkaSerDe.AvroSkipBytesConverter(schema, offset);
+
+    exception.expect(RuntimeException.class);
+    exception.expectMessage("org.apache.hadoop.hive.serde2.SerDeException: " + 
+      "Skip bytes value is larger than the message length.");
+    runConversionTest(conv, simpleRecordConfluentBytes);    
+  }
+
+  /**
+  * Test that we properly parse the converter type, no matter the casing.
+  */
+  @Test
+  public void bytesConverterTypeParseTest() {
+    Map<String, KafkaSerDe.BytesConverterType> testCases = new HashMap<String, KafkaSerDe.BytesConverterType>() {{
 
 Review comment:
   Nit: double brace.
   
   This could be implemented as a parameterized test 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387867610
 
 

 ##########
 File path: kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
 ##########
 @@ -369,6 +402,26 @@ private SubStructObjectInspector(StructObjectInspector baseOI, int toIndex) {
     }
   }
 
+    /**
+     * The converter reads bytes from kafka message and skip first @skipBytes from beginning.
+     *
+     * For example:
+     *       Confluent kafka producer add 5 magic bytes that represents Schema ID as Integer to the message.
 
 Review comment:
   Nit: Confluent doesn't have its own version of a Kafka Producer.
   
   Its the Avro serializer that adds the bytes 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format

Posted by GitBox <gi...@apache.org>.
cricket007 commented on a change in pull request #933: HIVE-21218: Adding support for Confluent Kafka Avro message format
URL: https://github.com/apache/hive/pull/933#discussion_r387997714
 
 

 ##########
 File path: kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
 ##########
 @@ -133,12 +134,40 @@
       Preconditions.checkArgument(!schemaFromProperty.isEmpty(), "Avro Schema is empty Can not go further");
       Schema schema = AvroSerdeUtils.getSchemaFor(schemaFromProperty);
       LOG.debug("Building Avro Reader with schema {}", schemaFromProperty);
-      bytesConverter = new AvroBytesConverter(schema);
+      bytesConverter = getByteConverterForAvroDelegate(schema, tbl);
     } else {
       bytesConverter = new BytesWritableConverter();
     }
   }
 
+  enum BytesConverterType {
+    CONFLUENT,
+    SKIP,
+    NONE;
+
+    static BytesConverterType fromString(String value) {
+      try {
+        return BytesConverterType.valueOf(value.trim().toUpperCase());
+      } catch (Exception e){
+        return NONE;
+      }
+    }
+  }
+
+  BytesConverter getByteConverterForAvroDelegate(Schema schema, Properties tbl) {
+    String avroBytesConverterPropertyName = AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_TYPE.getPropName();
+    String avroBytesConverterProperty = tbl.getProperty(avroBytesConverterPropertyName, 
+      BytesConverterType.NONE.toString());
+    BytesConverterType avroByteConverterType = BytesConverterType.fromString(avroBytesConverterProperty);
+    String avroSkipBytesPropertyName = AvroSerdeUtils.AvroTableProperties.AVRO_SERDE_SKIP_BYTES.getPropName();
+    Integer avroSkipBytes = Integer.parseInt(tbl.getProperty(avroSkipBytesPropertyName));
 
 Review comment:
   nit `catch (NumberFormatException )`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org