You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by dossett <gi...@git.apache.org> on 2016/01/27 17:54:07 UTC

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

GitHub user dossett opened a pull request:

    https://github.com/apache/storm/pull/1052

    STORM-1504: Add Serializer and instruction for AvroGenericRecordBolt

    This was new for me, so some questions I have:
    
    - Would be better to automatically register this serializer instead of providing developer instructions?
    - What's the best practice for exception handling in a serializer? Throwing a RunTimeException seemed like the best option.
    
    Provided this PR is accepted, I would also vote for backporting this to 1.0.0 since the AvroGenericRecordBolt is unusable without it in a multi-worker topology.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/dossett/storm STORM-1504

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/1052.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1052
    
----
commit eda0bfa07abe07d2b77a42e3f9496ac55959c909
Author: Aaron Dossett <aa...@target.com>
Date:   2016-01-27T16:49:19Z

    STORM-1504: Add Serializer and instruction for AvroGenericRecordBolt

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1052#discussion_r51475643
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java ---
    @@ -17,8 +17,9 @@
      */
     package org.apache.storm.hdfs.bolt;
     
    +import org.apache.avro.generic.GenericData;
     import org.apache.storm.Config;
    -import org.apache.storm.Constants;
    +import org.apache.storm.hdfs.avro.AbstractAvroSerializer;
    --- End diff --
    
    The deletes look OK but these new imports are not used, can we remove them or revert the file?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1052#discussion_r51231057
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/DefinedAvroSchemaRegistry.java ---
    @@ -0,0 +1,59 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.avro;
    +
    +import com.google.common.collect.BiMap;
    +import com.google.common.collect.HashBiMap;
    +import org.apache.avro.Schema;
    +import org.apache.avro.SchemaNormalization;
    +
    +import java.security.NoSuchAlgorithmException;
    +import java.util.List;
    +import java.util.Map;
    +
    +public class DefinedAvroSchemaRegistry implements AvroSchemaRegistry{
    +
    +    private final static String FP_ALGO = "SHA-256";
    --- End diff --
    
    this will generate 32 bytes fingerprint. CRC-64-AVRO should be good enough I think. At the max, user can override the algorithm himself. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1052#discussion_r51230998
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/DefinedAvroSchemaRegistry.java ---
    @@ -0,0 +1,59 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.avro;
    +
    +import com.google.common.collect.BiMap;
    +import com.google.common.collect.HashBiMap;
    +import org.apache.avro.Schema;
    +import org.apache.avro.SchemaNormalization;
    +
    +import java.security.NoSuchAlgorithmException;
    +import java.util.List;
    +import java.util.Map;
    +
    +public class DefinedAvroSchemaRegistry implements AvroSchemaRegistry{
    +
    +    private final static String FP_ALGO = "SHA-256";
    +    final BiMap<String, Schema> fingerprint2schemaMap;
    +    final BiMap<Schema, String> schema2fingerprintMap;
    +
    +    DefinedAvroSchemaRegistry(final Map<String, Schema> definedSchemas) {
    +        fingerprint2schemaMap = HashBiMap.create(definedSchemas);
    +        schema2fingerprintMap = fingerprint2schemaMap.inverse();
    +    }
    +
    +    DefinedAvroSchemaRegistry(List<Schema> schemaList) throws NoSuchAlgorithmException {
    +        fingerprint2schemaMap = HashBiMap.create();
    +        for (Schema schema : schemaList) {
    +            String fingerPrint = new String(SchemaNormalization.parsingFingerprint(FP_ALGO, schema));
    --- End diff --
    
    I am not sure about constructing a string out of non-text bytes and then reading it back. What you write may be different from what you read back due to encoding/decoding difference. can you use a byte[] array as type instead of string for fingerprint? or if you really want to use string as the key, fingerprint can be base64 encoded and returned as string. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/1052#issuecomment-178645013
  
    @dossett yes I have been able to reproduce the error, and it appears to somehow be related to this pull request.
    
    ```
    mvn clean install -DskipTests
    cd examples/strom-starter
    mvn clean test
    ```
    
    The above works on master but fails 100% of the time with this pull request.  I see some ClassDef like issues so my guess is that the dependencies are somehow messed up, probably with log4j.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/1052#issuecomment-178649925
  
    @dossett I did a difference for running mvn dependency:tree between master and this pull just in storm starter and this is what showed up.
    
    ```
    -[INFO] |  \- org.apache.hadoop:hadoop-auth:jar:2.6.1:compile
    +[INFO] |  +- org.apache.hadoop:hadoop-auth:jar:2.6.1:compile
    +[INFO] |  \- io.confluent:kafka-avro-serializer:jar:1.0:compile
    +[INFO] |     \- io.confluent:kafka-schema-registry-client:jar:1.0:compile
    +[INFO] |        \- org.slf4j:slf4j-log4j12:jar:1.6.6:compile
    ```
    
    looks like we added in a dependency on org.slf4j:slf4j-log4j12:jar:1.6.6:compile I think if you exclude that dependency it should work fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by dossett <gi...@git.apache.org>.
Github user dossett commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1052#discussion_r51171090
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericSerializer.java ---
    @@ -0,0 +1,70 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.common;
    +
    +import com.esotericsoftware.kryo.Kryo;
    +import com.esotericsoftware.kryo.Serializer;
    +import com.esotericsoftware.kryo.io.Input;
    +import com.esotericsoftware.kryo.io.Output;
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericContainer;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericDatumWriter;
    +import org.apache.avro.io.BinaryEncoder;
    +import org.apache.avro.io.Decoder;
    +import org.apache.avro.io.DecoderFactory;
    +import org.apache.avro.io.EncoderFactory;
    +
    +import java.io.IOException;
    +
    +//Generously adapted from:
    +//https://github.com/kijiproject/kiji-express/blob/master/kiji-express/src/main/scala/org/kiji/express/flow/framework/serialization/AvroSerializer.scala
    +//Which has as an ASL2.0 license
    +public class AvroGenericSerializer extends Serializer<GenericContainer> {
    +    @Override
    +    public void write(Kryo kryo, Output output, GenericContainer record) {
    +        output.writeString(record.getSchema().toString());
    +        GenericDatumWriter<GenericContainer> writer = new GenericDatumWriter<>(record.getSchema());
    +
    +        BinaryEncoder encoder = EncoderFactory
    +                .get()
    +                .directBinaryEncoder(output, null);
    +        try {
    +            writer.write(record, encoder);
    +        } catch (IOException e) {
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    @Override
    +    public GenericContainer read(Kryo kryo, Input input, Class<GenericContainer> aClass) {
    +        Schema theSchema = new Schema.Parser().parse(input.readString());
    --- End diff --
    
    I pushed a commit that includes a lot of the above, but is not complete.
    
    @revans2 I included support for Confluent's registry.  Some basic testing in our (Target's) dev environments seemed good.
    @abhishekagarwal87 I haven't used BiMaps before, so I'm sure that part could be greatly improved.
    
    Still to be done:
    - make the the unit tests more robust and parameterized.
    - add capability to inject different AvroSchemaRegistry implementations into the generic serializer, which might be challenging


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the pull request:

    https://github.com/apache/storm/pull/1052#issuecomment-178734184
  
    Looks good. +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by dossett <gi...@git.apache.org>.
Github user dossett commented on the pull request:

    https://github.com/apache/storm/pull/1052#issuecomment-178648710
  
    Thanks @revans2, I will check it out.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1052#discussion_r51231410
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSchemaRegistry.java ---
    @@ -0,0 +1,74 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.avro;
    +
    +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
    +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
    +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
    +import org.apache.avro.Schema;
    +
    +import java.io.IOException;
    +
    +/**
    + *
    + */
    +public class ConfluentAvroSchemaRegistry implements AvroSchemaRegistry {
    +
    +    SchemaRegistryClient theClient;
    +    final String url;
    +
    +    public ConfluentAvroSchemaRegistry(String url) {
    +        this.url = url;
    +    }
    +
    +    @Override
    +    public String getFingerprint(Schema schema) {
    +        if (theClient == null)
    +        {
    +            theClient = initializeClient();
    +        }
    +        String subject = schema.getName();
    +        final int version;
    +        try {
    +            version = theClient.register(subject, schema);
    --- End diff --
    
    is it a schema version or schema identifier? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by dossett <gi...@git.apache.org>.
Github user dossett commented on the pull request:

    https://github.com/apache/storm/pull/1052#issuecomment-176801519
  
    Thanks everyone for the feedback on the code.  I am beginning to appreciate some of the challenges of serialization, in particular whatever serializer is registered with kryo can only be invoked with a default constructor, so my naive implementation, which relies on initializations via constructors in some spots will not work. The meaning of @revans2's comments about reading data from a special file in the jar are also becoming clearer to me -- that's a way to get information needed for initialization without relying on constructor parameters.  I'll take another pass at cleaning up at least some of these issues and fixing/addressing comments.  There's some excitement here about using our Confluent registry to speed up serialization, so I really want to make this work.
    
    These are some new corners of java for me, so in the worst case scenario I'll simply have learned a lot.  Thanks everyone!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by dossett <gi...@git.apache.org>.
Github user dossett commented on the pull request:

    https://github.com/apache/storm/pull/1052#issuecomment-177263999
  
    @revans2 @abhishekagarwal87 Addressed your comments and made the serializer scheme usable by kryo.  Removed the ConfluentAvroSerializer unit test because I can not yet get a usable, test-local schema registry up and running, but it does work fine in my dev environment.
    
    @abhishekagarwal87 I could not compile with that shaded package name, but I am also not an advanced user of shade, so I could be missing something.  Also, I left the GenericSerializer as the default for now.  I would rather have a slow default that works and then guide them to better options in documentation, but I'm open to counter-arguments.
    
    Still to do, based on additional feedback:
    - refine unit tests one more time
    - add documentation
    
    Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1052#discussion_r51082690
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericSerializer.java ---
    @@ -0,0 +1,70 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.common;
    +
    +import com.esotericsoftware.kryo.Kryo;
    +import com.esotericsoftware.kryo.Serializer;
    +import com.esotericsoftware.kryo.io.Input;
    +import com.esotericsoftware.kryo.io.Output;
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericContainer;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericDatumWriter;
    +import org.apache.avro.io.BinaryEncoder;
    +import org.apache.avro.io.Decoder;
    +import org.apache.avro.io.DecoderFactory;
    +import org.apache.avro.io.EncoderFactory;
    +
    +import java.io.IOException;
    +
    +//Generously adapted from:
    +//https://github.com/kijiproject/kiji-express/blob/master/kiji-express/src/main/scala/org/kiji/express/flow/framework/serialization/AvroSerializer.scala
    +//Which has as an ASL2.0 license
    +public class AvroGenericSerializer extends Serializer<GenericContainer> {
    +    @Override
    +    public void write(Kryo kryo, Output output, GenericContainer record) {
    +        output.writeString(record.getSchema().toString());
    +        GenericDatumWriter<GenericContainer> writer = new GenericDatumWriter<>(record.getSchema());
    +
    +        BinaryEncoder encoder = EncoderFactory
    +                .get()
    +                .directBinaryEncoder(output, null);
    +        try {
    +            writer.write(record, encoder);
    +        } catch (IOException e) {
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    @Override
    +    public GenericContainer read(Kryo kryo, Input input, Class<GenericContainer> aClass) {
    +        Schema theSchema = new Schema.Parser().parse(input.readString());
    --- End diff --
    
    The approach sounds right. Though I will suggest not to use the above version of schema registry as default one as it is costly. 
    `
    class GenericAvroSchemaRegistry2 implements AvroSchemaRegistry {
      private BiMap<String,Schema> schemaMap; //This is populated while building the topology. It should be serializable. 
       public String getKey(Schema schema) {
          return schemaMap.get(schema);
       }
    
        public Schema getSchema(String key) {
           return schemaMap.get(key);
       }
    `
    Multiple implementations of the registry can be shipped. However, default one should be safest option to use.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1052#discussion_r51127995
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericSerializer.java ---
    @@ -0,0 +1,70 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.common;
    +
    +import com.esotericsoftware.kryo.Kryo;
    +import com.esotericsoftware.kryo.Serializer;
    +import com.esotericsoftware.kryo.io.Input;
    +import com.esotericsoftware.kryo.io.Output;
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericContainer;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericDatumWriter;
    +import org.apache.avro.io.BinaryEncoder;
    +import org.apache.avro.io.Decoder;
    +import org.apache.avro.io.DecoderFactory;
    +import org.apache.avro.io.EncoderFactory;
    +
    +import java.io.IOException;
    +
    +//Generously adapted from:
    +//https://github.com/kijiproject/kiji-express/blob/master/kiji-express/src/main/scala/org/kiji/express/flow/framework/serialization/AvroSerializer.scala
    +//Which has as an ASL2.0 license
    +public class AvroGenericSerializer extends Serializer<GenericContainer> {
    +    @Override
    +    public void write(Kryo kryo, Output output, GenericContainer record) {
    +        output.writeString(record.getSchema().toString());
    +        GenericDatumWriter<GenericContainer> writer = new GenericDatumWriter<>(record.getSchema());
    +
    +        BinaryEncoder encoder = EncoderFactory
    +                .get()
    +                .directBinaryEncoder(output, null);
    +        try {
    +            writer.write(record, encoder);
    +        } catch (IOException e) {
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    @Override
    +    public GenericContainer read(Kryo kryo, Input input, Class<GenericContainer> aClass) {
    +        Schema theSchema = new Schema.Parser().parse(input.readString());
    --- End diff --
    
    @dossett yes you got exactly what I was thinking.  If you do have code to support an open source schema registry I would love to see that include in here too.  And changing key to fingerprint sounds like a better name to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1052#discussion_r51045767
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericSerializer.java ---
    @@ -0,0 +1,70 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.common;
    +
    +import com.esotericsoftware.kryo.Kryo;
    +import com.esotericsoftware.kryo.Serializer;
    +import com.esotericsoftware.kryo.io.Input;
    +import com.esotericsoftware.kryo.io.Output;
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericContainer;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericDatumWriter;
    +import org.apache.avro.io.BinaryEncoder;
    +import org.apache.avro.io.Decoder;
    +import org.apache.avro.io.DecoderFactory;
    +import org.apache.avro.io.EncoderFactory;
    +
    +import java.io.IOException;
    +
    +//Generously adapted from:
    +//https://github.com/kijiproject/kiji-express/blob/master/kiji-express/src/main/scala/org/kiji/express/flow/framework/serialization/AvroSerializer.scala
    +//Which has as an ASL2.0 license
    +public class AvroGenericSerializer extends Serializer<GenericContainer> {
    +    @Override
    +    public void write(Kryo kryo, Output output, GenericContainer record) {
    +        output.writeString(record.getSchema().toString());
    +        GenericDatumWriter<GenericContainer> writer = new GenericDatumWriter<>(record.getSchema());
    +
    +        BinaryEncoder encoder = EncoderFactory
    +                .get()
    +                .directBinaryEncoder(output, null);
    +        try {
    +            writer.write(record, encoder);
    +        } catch (IOException e) {
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    @Override
    +    public GenericContainer read(Kryo kryo, Input input, Class<GenericContainer> aClass) {
    +        Schema theSchema = new Schema.Parser().parse(input.readString());
    --- End diff --
    
    The problem here is that the schema is large (JSON) and the data is small (no tags which is why you need the schema).  If we send the schema each time we send the data we are wasting a lot of resources.  The only real way around this is to have a cheap way to get the schema while sending minimal data, ideally also caching the parsed schema so we don't have to parse it each time.
    
    The best way I know of would be to have an external schema registry and send a reference to the original schema with the data. That is what we do at Yahoo, but the registry is unlikely to ever be open sourced (sorry about that not my code).
    
    My proposal would be to create a SchemaRegistry API that lets you do something like.
    
    ```
    public interface AvroSchemaRegistry {
        public String getKey(Schema schema);
    
        public Schema getSchema(String key);
    }
    ```
    
    We could then have two default implementations, or even a hybrid one.  The generic one that would work each time would do exactly what this code does.  Turns the schema into a string and the deserializes it on the other side.  You could do some caching if you wanted to.
    
    A cheaper version would not serialize/deserialize it each time, but go off of a checksum like avro RPC does. It could then look to see if a special schema file exists in the topology jar that matches the checksum, or possibly full name + version and it computes the checksum.  From that it could send the checksum, or fullname + version instead of the full schema. The read code would read the checksum/version lookup the file read/parse/cache the schema and return it.
    
    A hybrid would first try the cached copy, then the checksum/jar and if it could not find it fall back to the full/slow schema.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by dossett <gi...@git.apache.org>.
Github user dossett commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1052#discussion_r51349531
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java ---
    @@ -0,0 +1,81 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.avro;
    +
    +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
    +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
    +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
    +import org.apache.avro.Schema;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +
    +/**
    + *
    + */
    +public class ConfluentAvroSerializer extends AbstractAvroSerializer {
    +
    +    private SchemaRegistryClient theClient;
    +    final private String url;
    +
    +    public ConfluentAvroSerializer() throws IOException {
    +        //Empty
    --- End diff --
    
    I'd like to, but it wasn't obvious how I could expose the global config to that class since kryo will only use the default constructor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1052#discussion_r51349785
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java ---
    @@ -0,0 +1,81 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.avro;
    +
    +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
    +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
    +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
    +import org.apache.avro.Schema;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +
    +/**
    + *
    + */
    +public class ConfluentAvroSerializer extends AbstractAvroSerializer {
    +
    +    private SchemaRegistryClient theClient;
    +    final private String url;
    +
    +    public ConfluentAvroSerializer() throws IOException {
    +        //Empty
    --- End diff --
    
    I did a quick search to find if there are serializers which use global configuration. Here is one example - 
    https://github.com/apache/storm/blob/master/storm-core/src/jvm/org/apache/storm/security/serialization/BlowfishTupleSerializer.java
    
    Here is how the serializers are initialized - 
    https://github.com/apache/storm/blob/master/storm-core/src/jvm/org/apache/storm/serialization/SerializationFactory.java#L168
    
    If you have a constructor with same signature as in the BlowFishTupleSerializer, I am sure it should work. can you try?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/1052#issuecomment-178174157
  
    The build is complaining that `external/storm-hdfs/src/test/resources/FixedAvroSerializer.config` does not have an apache license header in it.
    
    Because it cannot have a header in it, please update the rat profile in pom.xml to exclude it.
    
    After that I am +1 for this change, with or without the small nit on the imports. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by dossett <gi...@git.apache.org>.
Github user dossett commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1052#discussion_r51127135
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericSerializer.java ---
    @@ -0,0 +1,70 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.common;
    +
    +import com.esotericsoftware.kryo.Kryo;
    +import com.esotericsoftware.kryo.Serializer;
    +import com.esotericsoftware.kryo.io.Input;
    +import com.esotericsoftware.kryo.io.Output;
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericContainer;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericDatumWriter;
    +import org.apache.avro.io.BinaryEncoder;
    +import org.apache.avro.io.Decoder;
    +import org.apache.avro.io.DecoderFactory;
    +import org.apache.avro.io.EncoderFactory;
    +
    +import java.io.IOException;
    +
    +//Generously adapted from:
    +//https://github.com/kijiproject/kiji-express/blob/master/kiji-express/src/main/scala/org/kiji/express/flow/framework/serialization/AvroSerializer.scala
    +//Which has as an ASL2.0 license
    +public class AvroGenericSerializer extends Serializer<GenericContainer> {
    +    @Override
    +    public void write(Kryo kryo, Output output, GenericContainer record) {
    +        output.writeString(record.getSchema().toString());
    +        GenericDatumWriter<GenericContainer> writer = new GenericDatumWriter<>(record.getSchema());
    +
    +        BinaryEncoder encoder = EncoderFactory
    +                .get()
    +                .directBinaryEncoder(output, null);
    +        try {
    +            writer.write(record, encoder);
    +        } catch (IOException e) {
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    @Override
    +    public GenericContainer read(Kryo kryo, Input input, Class<GenericContainer> aClass) {
    +        Schema theSchema = new Schema.Parser().parse(input.readString());
    --- End diff --
    
    Thanks @abhishekagarwal87. I like that implementation when all the schemas are known in advance.  The use case I've been writing this for needs to support arbitrary schema evolution over the life of the topology, so I have not thought about it from that perspective.
    
    As to the default, I think the safest option is the one that doesn't require the user to do anything, but I would be interested in hearing other opinions on that as well.
    
    Thanks for the feedback everyone, I hope to have some code added to this PR soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1052#discussion_r51607023
  
    --- Diff: external/storm-hdfs/README.md ---
    @@ -315,6 +314,18 @@ An `org.apache.avro.Schema` object cannot be directly provided since it does not
     The AvroGenericRecordBolt expects to receive tuples containing an Avro GenericRecord that conforms to the provided
     schema.
     
    +To use this bolt you **must** register the appropriate Kryo serializers with your topology configuration.  A convenience
    +method is provided for this:
    +
    +```AvroGenericRecordBolt.addAvroKryoSerializations(conf);```
    +
    +By default Storm will use the ```GenericAvroSerializer``` to handle serialization.  This will work, but there are much 
    +faster options available if you can pre-define the schemas you will be using or utilize an external schema registry. An
    +implementation using the Confluent Schema Registry is provided, but others can be implemented and provided to Storm.
    +Please see the javadoc for classes in org.apache.storm.hdfs.avro for information about using the built-in options or
    +creating your own.
    +
    --- End diff --
    
    Something's off with the markdown formatting here, such that the content below is all rendered as code/preformatted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/1052


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by dossett <gi...@git.apache.org>.
Github user dossett commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1052#discussion_r51609059
  
    --- Diff: external/storm-hdfs/README.md ---
    @@ -315,6 +314,18 @@ An `org.apache.avro.Schema` object cannot be directly provided since it does not
     The AvroGenericRecordBolt expects to receive tuples containing an Avro GenericRecord that conforms to the provided
     schema.
     
    +To use this bolt you **must** register the appropriate Kryo serializers with your topology configuration.  A convenience
    +method is provided for this:
    +
    +```AvroGenericRecordBolt.addAvroKryoSerializations(conf);```
    +
    +By default Storm will use the ```GenericAvroSerializer``` to handle serialization.  This will work, but there are much 
    +faster options available if you can pre-define the schemas you will be using or utilize an external schema registry. An
    +implementation using the Confluent Schema Registry is provided, but others can be implemented and provided to Storm.
    +Please see the javadoc for classes in org.apache.storm.hdfs.avro for information about using the built-in options or
    +creating your own.
    +
    --- End diff --
    
    It looked good in IntelliJ, thanks for checking it here. Should be fixed now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the pull request:

    https://github.com/apache/storm/pull/1052#issuecomment-178722021
  
    One minor issue with the markdown formatting in the README, but I'm +1 once that's fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1052#discussion_r51350300
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java ---
    @@ -0,0 +1,81 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.avro;
    +
    +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
    +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
    +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
    +import org.apache.avro.Schema;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +
    +/**
    + *
    + */
    +public class ConfluentAvroSerializer extends AbstractAvroSerializer {
    +
    +    private SchemaRegistryClient theClient;
    +    final private String url;
    +
    +    public ConfluentAvroSerializer() throws IOException {
    +        //Empty
    --- End diff --
    
    Yes we should document that a lot better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by dossett <gi...@git.apache.org>.
Github user dossett commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1052#discussion_r51421641
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/DefinedAvroSerializer.java ---
    @@ -0,0 +1,62 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.avro;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.SchemaNormalization;
    +import org.apache.commons.codec.binary.Base64;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.security.NoSuchAlgorithmException;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +public class DefinedAvroSerializer extends AbstractAvroSerializer {
    +
    +    private final static String FP_ALGO = "CRC-64-AVRO";
    +    final Map<String, Schema> fingerprint2schemaMap = new HashMap<>();
    +    final Map<Schema, String> schema2fingerprintMap = new HashMap<>();
    +
    +    public DefinedAvroSerializer() throws IOException, NoSuchAlgorithmException {
    --- End diff --
    
    @abhishekagarwal87 In this case, getting the pre-defined schemas from a file felt like the right thing to do.  A user could specify an arbitrary number of schemas, each of arbitrary size.  That didn't feel right to put in the global storm config.  Happy to hear other opinions on that though.  Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by dossett <gi...@git.apache.org>.
Github user dossett commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1052#discussion_r51063276
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericSerializer.java ---
    @@ -0,0 +1,70 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.common;
    +
    +import com.esotericsoftware.kryo.Kryo;
    +import com.esotericsoftware.kryo.Serializer;
    +import com.esotericsoftware.kryo.io.Input;
    +import com.esotericsoftware.kryo.io.Output;
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericContainer;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericDatumWriter;
    +import org.apache.avro.io.BinaryEncoder;
    +import org.apache.avro.io.Decoder;
    +import org.apache.avro.io.DecoderFactory;
    +import org.apache.avro.io.EncoderFactory;
    +
    +import java.io.IOException;
    +
    +//Generously adapted from:
    +//https://github.com/kijiproject/kiji-express/blob/master/kiji-express/src/main/scala/org/kiji/express/flow/framework/serialization/AvroSerializer.scala
    +//Which has as an ASL2.0 license
    +public class AvroGenericSerializer extends Serializer<GenericContainer> {
    +    @Override
    +    public void write(Kryo kryo, Output output, GenericContainer record) {
    +        output.writeString(record.getSchema().toString());
    +        GenericDatumWriter<GenericContainer> writer = new GenericDatumWriter<>(record.getSchema());
    +
    +        BinaryEncoder encoder = EncoderFactory
    +                .get()
    +                .directBinaryEncoder(output, null);
    +        try {
    +            writer.write(record, encoder);
    +        } catch (IOException e) {
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    @Override
    +    public GenericContainer read(Kryo kryo, Input input, Class<GenericContainer> aClass) {
    +        Schema theSchema = new Schema.Parser().parse(input.readString());
    --- End diff --
    
    Thanks for the thoughtful comments @revans2. I think I understand what you're describing but want to make sure.
    
    - A generic registry would just treat the schema as the key and vice versa, so it's always passed around.
    
    ```
    class GenericAvroSchemaRegistry implements AvroSchemaRegistry {
       public String getKey(Schema schema) {
          return schema.toString();
       }
    
        public Schema getSchema(String key) {
           return new Schema.Parser().parse(key);
       }
    ```
    
    - A jar-based registry approach could be used for schemas that are known in advance and worth  persisting across the entire topology, but fall back on the generic approach above if an unknown schema is used.
    - You might choose to implement your own own which relies on your proprietary registry, which is truly external to Storm
    - At Target we could implement our own since we also use an external schema registry, albeit one that is already open sourced.
    
    Do I have the gist of it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1052#discussion_r51229667
  
    --- Diff: external/storm-hdfs/pom.xml ---
    @@ -35,6 +35,13 @@
             </developer>
         </developers>
     
    +    <repositories>
    --- End diff --
    
    indentation can be corrected here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1052#discussion_r51350311
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java ---
    @@ -0,0 +1,81 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.avro;
    +
    +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
    +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
    +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
    +import org.apache.avro.Schema;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +
    +/**
    + *
    + */
    +public class ConfluentAvroSerializer extends AbstractAvroSerializer {
    --- End diff --
    
    Can we either have real javadocs or remove the empty comment?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1052#discussion_r51230736
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/DefinedAvroSchemaRegistry.java ---
    @@ -0,0 +1,59 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.avro;
    +
    +import com.google.common.collect.BiMap;
    --- End diff --
    
    storm packs the guava classes in shaded jar. should you refer to them? 
    e.g. org.apache.storm.guava.collect.BiMap. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1052#discussion_r51025554
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericSerializer.java ---
    @@ -0,0 +1,70 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.common;
    +
    +import com.esotericsoftware.kryo.Kryo;
    +import com.esotericsoftware.kryo.Serializer;
    +import com.esotericsoftware.kryo.io.Input;
    +import com.esotericsoftware.kryo.io.Output;
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericContainer;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericDatumWriter;
    +import org.apache.avro.io.BinaryEncoder;
    +import org.apache.avro.io.Decoder;
    +import org.apache.avro.io.DecoderFactory;
    +import org.apache.avro.io.EncoderFactory;
    +
    +import java.io.IOException;
    +
    +//Generously adapted from:
    +//https://github.com/kijiproject/kiji-express/blob/master/kiji-express/src/main/scala/org/kiji/express/flow/framework/serialization/AvroSerializer.scala
    +//Which has as an ASL2.0 license
    +public class AvroGenericSerializer extends Serializer<GenericContainer> {
    +    @Override
    +    public void write(Kryo kryo, Output output, GenericContainer record) {
    +        output.writeString(record.getSchema().toString());
    +        GenericDatumWriter<GenericContainer> writer = new GenericDatumWriter<>(record.getSchema());
    +
    +        BinaryEncoder encoder = EncoderFactory
    +                .get()
    +                .directBinaryEncoder(output, null);
    +        try {
    +            writer.write(record, encoder);
    +        } catch (IOException e) {
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    @Override
    +    public GenericContainer read(Kryo kryo, Input input, Class<GenericContainer> aClass) {
    +        Schema theSchema = new Schema.Parser().parse(input.readString());
    --- End diff --
    
    This can lead to higher GC depending on how frequently it is called.
    https://github.com/apache/avro/blob/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java#L107
    GenericDataumReader internally keeps a map of <ActualSchema, ExpectedSchema, ResolvingDecoder>. The resolving decoder objects are expensive to create. Map being used is an IdentityHashMap. So even though schema may be same, a different object will lead to creation of different ResolvingDecoder. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1052#discussion_r51350285
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AvroUtils.java ---
    @@ -0,0 +1,18 @@
    +package org.apache.storm.hdfs.avro;
    --- End diff --
    
    Needs an Apache Header


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1052#discussion_r51356078
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/DefinedAvroSerializer.java ---
    @@ -0,0 +1,62 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.avro;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.SchemaNormalization;
    +import org.apache.commons.codec.binary.Base64;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.security.NoSuchAlgorithmException;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +public class DefinedAvroSerializer extends AbstractAvroSerializer {
    +
    +    private final static String FP_ALGO = "CRC-64-AVRO";
    +    final Map<String, Schema> fingerprint2schemaMap = new HashMap<>();
    +    final Map<Schema, String> schema2fingerprintMap = new HashMap<>();
    +
    +    public DefinedAvroSerializer() throws IOException, NoSuchAlgorithmException {
    --- End diff --
    
    this class still uses the default constructor. Apart from this, +1 non-binding from me. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1052#discussion_r51349431
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java ---
    @@ -0,0 +1,81 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.avro;
    +
    +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
    +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
    +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
    +import org.apache.avro.Schema;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +
    +/**
    + *
    + */
    +public class ConfluentAvroSerializer extends AbstractAvroSerializer {
    +
    +    private SchemaRegistryClient theClient;
    +    final private String url;
    +
    +    public ConfluentAvroSerializer() throws IOException {
    +        //Empty
    --- End diff --
    
    can the url be simple passed as storm configuration?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1052#discussion_r51231303
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AvroGenericSerializer.java ---
    @@ -0,0 +1,79 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.avro;
    +
    +import com.esotericsoftware.kryo.Kryo;
    +import com.esotericsoftware.kryo.Serializer;
    +import com.esotericsoftware.kryo.io.Input;
    +import com.esotericsoftware.kryo.io.Output;
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericContainer;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericDatumWriter;
    +import org.apache.avro.io.BinaryEncoder;
    +import org.apache.avro.io.Decoder;
    +import org.apache.avro.io.DecoderFactory;
    +import org.apache.avro.io.EncoderFactory;
    +
    +import java.io.IOException;
    +
    +//Generously adapted from:
    +//https://github.com/kijiproject/kiji-express/blob/master/kiji-express/src/main/scala/org/kiji/express/flow/framework/serialization/AvroSerializer.scala
    +//Which has as an ASL2.0 license
    +public class AvroGenericSerializer extends Serializer<GenericContainer> {
    +
    +    AvroSchemaRegistry registry = new GenericAvroSchemaRegistry();
    --- End diff --
    
    I would advise against using GenericAvroSchemaRegistry as default. It looks to be very costly. you can try a test run with sufficient input load and verify it.. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by dossett <gi...@git.apache.org>.
Github user dossett commented on the pull request:

    https://github.com/apache/storm/pull/1052#issuecomment-177363506
  
    Code upmerged and commits squashed.  @abhishekagarwal87 Your pointer to the serialization examples was terrific, thank you!  It worked perfectly and really cleaned up that class.
    
    Documentation still to come, assuming the interface has stabilized.  Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by dossett <gi...@git.apache.org>.
Github user dossett commented on the pull request:

    https://github.com/apache/storm/pull/1052#issuecomment-178093239
  
    Documentation added and DefinedAvroSerializer renamed to FixedAvroSerializer.
    
    The travis failures seem unrelated to this change?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1052#discussion_r51082926
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericSerializer.java ---
    @@ -0,0 +1,70 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.common;
    +
    +import com.esotericsoftware.kryo.Kryo;
    +import com.esotericsoftware.kryo.Serializer;
    +import com.esotericsoftware.kryo.io.Input;
    +import com.esotericsoftware.kryo.io.Output;
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericContainer;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericDatumWriter;
    +import org.apache.avro.io.BinaryEncoder;
    +import org.apache.avro.io.Decoder;
    +import org.apache.avro.io.DecoderFactory;
    +import org.apache.avro.io.EncoderFactory;
    +
    +import java.io.IOException;
    +
    +//Generously adapted from:
    +//https://github.com/kijiproject/kiji-express/blob/master/kiji-express/src/main/scala/org/kiji/express/flow/framework/serialization/AvroSerializer.scala
    +//Which has as an ASL2.0 license
    +public class AvroGenericSerializer extends Serializer<GenericContainer> {
    +    @Override
    +    public void write(Kryo kryo, Output output, GenericContainer record) {
    +        output.writeString(record.getSchema().toString());
    +        GenericDatumWriter<GenericContainer> writer = new GenericDatumWriter<>(record.getSchema());
    +
    +        BinaryEncoder encoder = EncoderFactory
    +                .get()
    +                .directBinaryEncoder(output, null);
    +        try {
    +            writer.write(record, encoder);
    +        } catch (IOException e) {
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    @Override
    +    public GenericContainer read(Kryo kryo, Input input, Class<GenericContainer> aClass) {
    +        Schema theSchema = new Schema.Parser().parse(input.readString());
    --- End diff --
    
    Key need not be supplied externally if schema fingerprint is used as the key. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by dossett <gi...@git.apache.org>.
Github user dossett commented on the pull request:

    https://github.com/apache/storm/pull/1052#issuecomment-178672701
  
    @revans2 Spot on, I just pushed that change. Thanks for catching that. If the travis builds pass, I will commit today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by dossett <gi...@git.apache.org>.
Github user dossett commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1052#discussion_r51349470
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java ---
    @@ -0,0 +1,81 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.avro;
    +
    +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
    +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
    +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
    +import org.apache.avro.Schema;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +
    +/**
    + *
    + */
    +public class ConfluentAvroSerializer extends AbstractAvroSerializer {
    +
    +    private SchemaRegistryClient theClient;
    +    final private String url;
    +
    +    public ConfluentAvroSerializer() throws IOException {
    +        //Empty
    +        InputStream in = this.getClass().getClassLoader().getResourceAsStream("ConfluentAvroSerializer.config");
    +        BufferedReader reader = new BufferedReader(new InputStreamReader(in));
    +        url = reader.readLine();
    +    }
    +
    +    @Override
    +    public String getFingerprint(Schema schema) {
    +        if (theClient == null)
    +        {
    +            theClient = initializeClient();
    +        }
    +        String subject = schema.getName();
    +        final int guid;
    +        try {
    +            guid = theClient.register(subject, schema);
    +            System.out.println("GUID: [" + guid + "]");
    --- End diff --
    
    Oops, thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1052#discussion_r51349345
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java ---
    @@ -0,0 +1,81 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.avro;
    +
    +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
    +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
    +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
    +import org.apache.avro.Schema;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +
    +/**
    + *
    + */
    +public class ConfluentAvroSerializer extends AbstractAvroSerializer {
    +
    +    private SchemaRegistryClient theClient;
    +    final private String url;
    +
    +    public ConfluentAvroSerializer() throws IOException {
    +        //Empty
    +        InputStream in = this.getClass().getClassLoader().getResourceAsStream("ConfluentAvroSerializer.config");
    +        BufferedReader reader = new BufferedReader(new InputStreamReader(in));
    +        url = reader.readLine();
    +    }
    +
    +    @Override
    +    public String getFingerprint(Schema schema) {
    +        if (theClient == null)
    +        {
    +            theClient = initializeClient();
    +        }
    +        String subject = schema.getName();
    +        final int guid;
    +        try {
    +            guid = theClient.register(subject, schema);
    +            System.out.println("GUID: [" + guid + "]");
    --- End diff --
    
    this should be removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1052#discussion_r51426989
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/DefinedAvroSerializer.java ---
    @@ -0,0 +1,62 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.avro;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.SchemaNormalization;
    +import org.apache.commons.codec.binary.Base64;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.security.NoSuchAlgorithmException;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +public class DefinedAvroSerializer extends AbstractAvroSerializer {
    +
    +    private final static String FP_ALGO = "CRC-64-AVRO";
    +    final Map<String, Schema> fingerprint2schemaMap = new HashMap<>();
    +    final Map<Schema, String> schema2fingerprintMap = new HashMap<>();
    +
    +    public DefinedAvroSerializer() throws IOException, NoSuchAlgorithmException {
    --- End diff --
    
    you can provide a static method like this
    DefinedAvroSerializer.registerSchema(Map conf)
    ```
    public static void registerSchema(Map conf, Schema schema) {
    Map<String, String> schemaMap = conf.get("topology.avro.schemas")
    if (null == schemaMap) {
     schemaMap = new HashMap();
    }
    schemaMap.put(getFingerprint(schema),  schema.toString())
    conf.put("topology.avro.schemas", schemaMap)
    }
    ```
    
    In the constructor, you can check if the key "topology.avro.schemas" is present. If not fall back to the method which you are already using. 
    That is one suggestion though. Other approaches are welcome. 
    Also class can be renamed since DefinedAvroSerializer doesn't describe the class well. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/1052#issuecomment-177294606
  
    Done with a first pass.  You need to upmerge.  We have moved to org.apache.storm from backtype.storm and the CI is failing because of it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/1052#issuecomment-178637591
  
    @dossett 
    
    storm-starter failed the same way in travis for both JDK8 and JDK7 something odd with logging.  I will try to reproduce it and see if it is just a fluke or if it is an actual issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by dossett <gi...@git.apache.org>.
Github user dossett commented on the pull request:

    https://github.com/apache/storm/pull/1052#issuecomment-178200940
  
    rat exclusions updated, imports fixed, commits squashed.
    
    Thank you again @revans2 and @abhishekagarwal87 for the feedback. I ended up learning way more from this exercise than I expected.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1504: Add Serializer and instruction for...

Posted by dossett <gi...@git.apache.org>.
Github user dossett commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1052#discussion_r51038995
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericSerializer.java ---
    @@ -0,0 +1,70 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.hdfs.common;
    +
    +import com.esotericsoftware.kryo.Kryo;
    +import com.esotericsoftware.kryo.Serializer;
    +import com.esotericsoftware.kryo.io.Input;
    +import com.esotericsoftware.kryo.io.Output;
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericContainer;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericDatumWriter;
    +import org.apache.avro.io.BinaryEncoder;
    +import org.apache.avro.io.Decoder;
    +import org.apache.avro.io.DecoderFactory;
    +import org.apache.avro.io.EncoderFactory;
    +
    +import java.io.IOException;
    +
    +//Generously adapted from:
    +//https://github.com/kijiproject/kiji-express/blob/master/kiji-express/src/main/scala/org/kiji/express/flow/framework/serialization/AvroSerializer.scala
    +//Which has as an ASL2.0 license
    +public class AvroGenericSerializer extends Serializer<GenericContainer> {
    +    @Override
    +    public void write(Kryo kryo, Output output, GenericContainer record) {
    +        output.writeString(record.getSchema().toString());
    +        GenericDatumWriter<GenericContainer> writer = new GenericDatumWriter<>(record.getSchema());
    +
    +        BinaryEncoder encoder = EncoderFactory
    +                .get()
    +                .directBinaryEncoder(output, null);
    +        try {
    +            writer.write(record, encoder);
    +        } catch (IOException e) {
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    @Override
    +    public GenericContainer read(Kryo kryo, Input input, Class<GenericContainer> aClass) {
    +        Schema theSchema = new Schema.Parser().parse(input.readString());
    --- End diff --
    
    Thanks, do you have suggestions for a different approach?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---