You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by do...@apache.org on 2016/02/02 19:58:24 UTC
[1/4] storm git commit: STORM-1504: Add Serializer and instruction
for AvroGenericRecordBolt
Repository: storm
Updated Branches:
refs/heads/master f2dcee6f3 -> 02a44c7fc
STORM-1504: Add Serializer and instruction for AvroGenericRecordBolt
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e5ca89b2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e5ca89b2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e5ca89b2
Branch: refs/heads/master
Commit: e5ca89b292aa8f7a983b29d610f71313594e32e3
Parents: 66d7a39
Author: Aaron Dossett <aa...@target.com>
Authored: Tue Feb 2 10:27:52 2016 -0600
Committer: Aaron Dossett <aa...@target.com>
Committed: Tue Feb 2 10:27:52 2016 -0600
----------------------------------------------------------------------
external/storm-hdfs/README.md | 13 ++-
external/storm-hdfs/pom.xml | 22 ++++++
.../storm/hdfs/avro/AbstractAvroSerializer.java | 80 +++++++++++++++++++
.../storm/hdfs/avro/AvroSchemaRegistry.java | 28 +++++++
.../org/apache/storm/hdfs/avro/AvroUtils.java | 44 +++++++++++
.../hdfs/avro/ConfluentAvroSerializer.java | 83 ++++++++++++++++++++
.../storm/hdfs/avro/FixedAvroSerializer.java | 67 ++++++++++++++++
.../storm/hdfs/avro/GenericAvroSerializer.java | 36 +++++++++
.../storm/hdfs/bolt/AvroGenericRecordBolt.java | 4 -
.../hdfs/avro/TestFixedAvroSerializer.java | 76 ++++++++++++++++++
.../hdfs/avro/TestGenericAvroSerializer.java | 68 ++++++++++++++++
.../test/resources/FixedAvroSerializer.config | 2 +
pom.xml | 3 +
13 files changed, 521 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e5ca89b2/external/storm-hdfs/README.md
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/README.md b/external/storm-hdfs/README.md
index bf63ad9..76a8602 100644
--- a/external/storm-hdfs/README.md
+++ b/external/storm-hdfs/README.md
@@ -307,7 +307,6 @@ The `org.apache.storm.hdfs.bolt.AvroGenericRecordBolt` class allows you to write
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);
```
-
The setup is very similar to the `SequenceFileBolt` example above. The key difference is that instead of specifying a
`SequenceFormat` you must provide a string representation of an Avro schema through the `withSchemaAsString()` method.
An `org.apache.avro.Schema` object cannot be directly provided since it does not implement `Serializable`.
@@ -315,6 +314,18 @@ An `org.apache.avro.Schema` object cannot be directly provided since it does not
The AvroGenericRecordBolt expects to receive tuples containing an Avro GenericRecord that conforms to the provided
schema.
+To use this bolt you **must** register the appropriate Kryo serializers with your topology configuration. A convenience
+method is provided for this:
+
+```AvroGenericRecordBolt.addAvroKryoSerializations(conf);```
+
+By default Storm will use the ```GenericAvroSerializer``` to handle serialization. This will work, but there are much
+faster options available if you can pre-define the schemas you will be using or utilize an external schema registry. An
+implementation using the Confluent Schema Registry is provided, but others can be implemented and provided to Storm.
+Please see the javadoc for classes in org.apache.storm.hdfs.avro for information about using the built-in options or
+creating your own.
+
+
## HDFS Bolt support for Trident API
storm-hdfs also includes a Trident `state` implementation for writing data to HDFS, with an API that closely mirrors
that of the bolts.
http://git-wip-us.apache.org/repos/asf/storm/blob/e5ca89b2/external/storm-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/pom.xml b/external/storm-hdfs/pom.xml
index 65b8eb7..29d3db1 100644
--- a/external/storm-hdfs/pom.xml
+++ b/external/storm-hdfs/pom.xml
@@ -35,6 +35,13 @@
</developer>
</developers>
+ <repositories>
+ <repository>
+ <id>confluent</id>
+ <url>http://packages.confluent.io/maven</url>
+ </repository>
+ </repositories>
+
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
@@ -74,6 +81,10 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -183,6 +194,17 @@
<version>4.11</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>io.confluent</groupId>
+ <artifactId>kafka-avro-serializer</artifactId>
+ <version>1.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<build>
<plugins>
http://git-wip-us.apache.org/repos/asf/storm/blob/e5ca89b2/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AbstractAvroSerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AbstractAvroSerializer.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AbstractAvroSerializer.java
new file mode 100644
index 0000000..ddf015d
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AbstractAvroSerializer.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.avro;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericContainer;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+
+import java.io.IOException;
+
+//Generously adapted from:
+//https://github.com/kijiproject/kiji-express/blob/master/kiji-express/src/main/scala/org/kiji/express/flow/framework/serialization/AvroSerializer.scala
+//Which has as an ASL2.0 license
+
+/**
+ * This abstract class can be extended to implement concrete classes capable of (de)serializing generic avro objects
+ * across a Topology. The methods in the AvroSchemaRegistry interface specify how schemas can be mapped to unique
+ * identifiers and vice versa. Implementations based on pre-defining schemas or utilizing an external schema registry
+ * are provided.
+ */
+public abstract class AbstractAvroSerializer extends Serializer<GenericContainer> implements AvroSchemaRegistry {
+
+ @Override
+ public void write(Kryo kryo, Output output, GenericContainer record) {
+
+ String fingerPrint = this.getFingerprint(record.getSchema());
+ output.writeString(fingerPrint);
+ GenericDatumWriter<GenericContainer> writer = new GenericDatumWriter<>(record.getSchema());
+
+ BinaryEncoder encoder = EncoderFactory
+ .get()
+ .directBinaryEncoder(output, null);
+ try {
+ writer.write(record, encoder);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public GenericContainer read(Kryo kryo, Input input, Class<GenericContainer> aClass) {
+ Schema theSchema = this.getSchema(input.readString());
+ GenericDatumReader<GenericContainer> reader = new GenericDatumReader<>(theSchema);
+ Decoder decoder = DecoderFactory
+ .get()
+ .directBinaryDecoder(input, null);
+
+ GenericContainer foo;
+ try {
+ foo = reader.read(null, decoder);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return foo;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e5ca89b2/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AvroSchemaRegistry.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AvroSchemaRegistry.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AvroSchemaRegistry.java
new file mode 100644
index 0000000..0d1dc8b
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AvroSchemaRegistry.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.avro;
+
+import org.apache.avro.Schema;
+
+import java.io.Serializable;
+
+public interface AvroSchemaRegistry extends Serializable {
+ String getFingerprint(Schema schema);
+
+ Schema getSchema(String fingerPrint);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e5ca89b2/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AvroUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AvroUtils.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AvroUtils.java
new file mode 100644
index 0000000..5549291
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AvroUtils.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.avro;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.storm.Config;
+
+public class AvroUtils {
+ /**
+ * A helper method to extract avro serialization configurations from the topology configuration and register
+ * specific kryo serializers as necessary. A default serializer will be provided if none is specified in the
+ * configuration. "avro.serializer" should specify the complete class name of the serializer, e.g.
+ * "org.apache.stgorm.hdfs.avro.GenericAvroSerializer"
+ *
+ * @param conf The topology configuration
+ * @throws ClassNotFoundException If the specified serializer cannot be located.
+ */
+ public static void addAvroKryoSerializations(Config conf) throws ClassNotFoundException {
+ final Class serializerClass;
+ if (conf.containsKey("avro.serializer")) {
+ serializerClass = Class.forName((String)conf.get("avro.serializer"));
+ }
+ else {
+ serializerClass = GenericAvroSerializer.class;
+ }
+ conf.registerSerialization(GenericData.Record.class, serializerClass);
+ conf.setSkipMissingKryoRegistrations(false);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e5ca89b2/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java
new file mode 100644
index 0000000..2008a3e
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.avro;
+
+import com.esotericsoftware.kryo.Kryo;
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * This class provides a mechanism to utilize the Confluent Schema Registry (https://github.com/confluentinc/schema-registry)
+ * for Storm to (de)serialize Avro generic records across a topology. It assumes the schema registry is up and running
+ * completely independent of Storm.
+ */
+public class ConfluentAvroSerializer extends AbstractAvroSerializer {
+
+ private SchemaRegistryClient theClient;
+ final private String url;
+
+ /**
+ * A constructor for use by test cases ONLY, thus the default scope.
+ * @param url The complete URL reference of a confluent schema registry, e.g. "http://HOST:PORT"
+ */
+ ConfluentAvroSerializer(String url) {
+ this.url = url;
+ this.theClient = new CachedSchemaRegistryClient(this.url, 10000);
+ }
+
+ /**
+ * A constructor with a signature that Storm can locate and use with kryo registration.
+ * See Storm's SerializationFactory class for details
+ *
+ * @param k Unused but needs to be present for Serialization Factory to find this constructor
+ * @param stormConf The global storm configuration. Must define "avro.schemaregistry.confluent" to locate the
+ * confluent schema registry. Should in the form of "http://HOST:PORT"
+ */
+ public ConfluentAvroSerializer(Kryo k, Map stormConf) {
+ url = (String) stormConf.get("avro.schemaregistry.confluent");
+ this.theClient = new CachedSchemaRegistryClient(this.url, 10000);
+ }
+
+ @Override
+ public String getFingerprint(Schema schema) {
+ final String subject = schema.getName();
+ final int guid;
+ try {
+ guid = theClient.register(subject, schema);
+ } catch (IOException | RestClientException e) {
+ throw new RuntimeException(e);
+ }
+ return Integer.toString(guid);
+ }
+
+ @Override
+ public Schema getSchema(String fingerPrint) {
+ final Schema theSchema;
+ try {
+ theSchema = theClient.getByID(Integer.parseInt(fingerPrint));
+ } catch (IOException | RestClientException e) {
+ throw new RuntimeException(e);
+ }
+ return theSchema;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e5ca89b2/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/FixedAvroSerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/FixedAvroSerializer.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/FixedAvroSerializer.java
new file mode 100644
index 0000000..4dd5fdc
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/FixedAvroSerializer.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+import org.apache.commons.codec.binary.Base64;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A class to help (de)serialize a pre-defined set of Avro schemas. Schemas should be listed, one per line, in a file
+ * called "FixedAvroSerializer.config", which must be part of the Storm topology jar file. Any schemas intended to be
+ * used with this class **MUST** be defined in that file.
+ */
+public class FixedAvroSerializer extends AbstractAvroSerializer {
+
+ private final static String FP_ALGO = "CRC-64-AVRO";
+ final Map<String, Schema> fingerprint2schemaMap = new HashMap<>();
+ final Map<Schema, String> schema2fingerprintMap = new HashMap<>();
+
+ public FixedAvroSerializer() throws IOException, NoSuchAlgorithmException {
+ InputStream in = this.getClass().getClassLoader().getResourceAsStream("FixedAvroSerializer.config");
+ BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+
+ String line;
+ while((line = reader.readLine()) != null) {
+ Schema schema = new Schema.Parser().parse(line);
+ byte [] fp = SchemaNormalization.parsingFingerprint(FP_ALGO, schema);
+ String fingerPrint = new String(Base64.decodeBase64(fp));
+
+ fingerprint2schemaMap.put(fingerPrint, schema);
+ schema2fingerprintMap.put(schema, fingerPrint);
+ }
+ }
+
+ @Override
+ public String getFingerprint(Schema schema) {
+ return schema2fingerprintMap.get(schema);
+ }
+
+ @Override
+ public Schema getSchema(String fingerPrint) {
+ return fingerprint2schemaMap.get(fingerPrint);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e5ca89b2/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/GenericAvroSerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/GenericAvroSerializer.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/GenericAvroSerializer.java
new file mode 100644
index 0000000..ecf8c49
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/GenericAvroSerializer.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.avro;
+
+import org.apache.avro.Schema;
+
+/**
+ * A default implementation of the AvroSerializer that will just pass literal schemas back and forth. This should
+ * only be used if no other serializer will fit a use case.
+ */
+public class GenericAvroSerializer extends AbstractAvroSerializer {
+ @Override
+ public String getFingerprint(Schema schema) {
+ return schema.toString();
+ }
+
+ @Override
+ public Schema getSchema(String fingerPrint) {
+ return new Schema.Parser().parse(fingerPrint);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e5ca89b2/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java
index c817c98..1fd2e2f 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java
@@ -17,8 +17,6 @@
*/
package org.apache.storm.hdfs.bolt;
-import org.apache.storm.Config;
-import org.apache.storm.Constants;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
@@ -41,8 +39,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
import java.util.EnumSet;
-import java.util.List;
-import java.util.LinkedList;
import java.util.Map;
public class AvroGenericRecordBolt extends AbstractHdfsBolt{
http://git-wip-us.apache.org/repos/asf/storm/blob/e5ca89b2/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestFixedAvroSerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestFixedAvroSerializer.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestFixedAvroSerializer.java
new file mode 100644
index 0000000..a584f91
--- /dev/null
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestFixedAvroSerializer.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.avro;
+
+import org.apache.avro.Schema;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestFixedAvroSerializer {
+ //These should match FixedAvroSerializer.config in the test resources
+ private static final String schemaString1 = "{\"type\":\"record\"," +
+ "\"name\":\"stormtest1\"," +
+ "\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"}," +
+ "{ \"name\":\"int1\", \"type\":\"int\" }]}";
+ private static final String schemaString2 = "{\"type\":\"record\"," +
+ "\"name\":\"stormtest2\"," +
+ "\"fields\":[{\"name\":\"foobar1\",\"type\":\"string\"}," +
+ "{ \"name\":\"intint1\", \"type\":\"int\" }]}";
+ private static final Schema schema1;
+ private static final Schema schema2;
+
+ final AvroSchemaRegistry reg;
+
+ static {
+
+ Schema.Parser parser = new Schema.Parser();
+ schema1 = parser.parse(schemaString1);
+
+ parser = new Schema.Parser();
+ schema2 = parser.parse(schemaString2);
+ }
+
+ public TestFixedAvroSerializer() throws Exception{
+ reg = new FixedAvroSerializer();
+ }
+
+ @Test
+ public void testSchemas() {
+ testTheSchema(schema1);
+ testTheSchema(schema2);
+ }
+
+ @Test public void testDifferentFPs() {
+ String fp1 = reg.getFingerprint(schema1);
+ String fp2 = reg.getFingerprint(schema2);
+
+ Assert.assertNotEquals(fp1, fp2);
+ }
+
+ private void testTheSchema(Schema schema) {
+ String fp1 = reg.getFingerprint(schema);
+ Schema found = reg.getSchema(fp1);
+ String fp2 = reg.getFingerprint(found);
+
+ Assert.assertEquals(found, schema);
+ Assert.assertEquals(fp1, fp2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e5ca89b2/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestGenericAvroSerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestGenericAvroSerializer.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestGenericAvroSerializer.java
new file mode 100644
index 0000000..ddfdcf5
--- /dev/null
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestGenericAvroSerializer.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.avro;
+
+import org.apache.avro.Schema;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestGenericAvroSerializer {
+ private static final String schemaString1 = "{\"type\":\"record\"," +
+ "\"name\":\"stormtest1\"," +
+ "\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"}," +
+ "{ \"name\":\"int1\", \"type\":\"int\" }]}";
+ private static final String schemaString2 = "{\"type\":\"record\"," +
+ "\"name\":\"stormtest2\"," +
+ "\"fields\":[{\"name\":\"foobar1\",\"type\":\"string\"}," +
+ "{ \"name\":\"intint1\", \"type\":\"int\" }]}";
+ private static final Schema schema1;
+ private static final Schema schema2;
+
+ AvroSchemaRegistry reg = new GenericAvroSerializer();
+
+ static {
+
+ Schema.Parser parser = new Schema.Parser();
+ schema1 = parser.parse(schemaString1);
+
+ parser = new Schema.Parser();
+ schema2 = parser.parse(schemaString2);
+ }
+
+ @Test
+ public void testSchemas() {
+ testTheSchema(schema1);
+ testTheSchema(schema2);
+ }
+
+ @Test public void testDifferentFPs() {
+ String fp1 = reg.getFingerprint(schema1);
+ String fp2 = reg.getFingerprint(schema2);
+
+ Assert.assertNotEquals(fp1, fp2);
+ }
+
+ private void testTheSchema(Schema schema) {
+ String fp1 = reg.getFingerprint(schema);
+ Schema found = reg.getSchema(fp1);
+ String fp2 = reg.getFingerprint(found);
+
+ Assert.assertEquals(found, schema);
+ Assert.assertEquals(fp1, fp2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e5ca89b2/external/storm-hdfs/src/test/resources/FixedAvroSerializer.config
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/resources/FixedAvroSerializer.config b/external/storm-hdfs/src/test/resources/FixedAvroSerializer.config
new file mode 100644
index 0000000..971d411
--- /dev/null
+++ b/external/storm-hdfs/src/test/resources/FixedAvroSerializer.config
@@ -0,0 +1,2 @@
+{"type":"record", "name":"stormtest1", "fields":[{"name":"foo1","type":"string"}, {"name":"int1", "type":"int" }]}
+{"type":"record", "name":"stormtest2", "fields":[{"name":"foobar1","type":"string"}, {"name":"intint1", "type":"int" }]}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e5ca89b2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 75345db..831059a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -357,6 +357,9 @@
<!-- StormSQL -->
<exclude>**/src/codegen/config.fmpp</exclude>
<exclude>**/src/codegen/data/Parser.tdd</exclude>
+
+ <!-- Avro Serializer Test Resource -->
+ <exclude>**/src/test/resources/FixedAvroSerializer.config</exclude>
</excludes>
</configuration>
</plugin>
[4/4] storm git commit: updated CHANGELOG with STORM-1054
Posted by do...@apache.org.
updated CHANGELOG with STORM-1054
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/02a44c7f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/02a44c7f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/02a44c7f
Branch: refs/heads/master
Commit: 02a44c7fc1b7b3a1571b326fde7bcae13e1b5c8d
Parents: 0dac59b
Author: Aaron Dossett <aa...@target.com>
Authored: Tue Feb 2 12:46:29 2016 -0600
Committer: Aaron Dossett <aa...@target.com>
Committed: Tue Feb 2 12:46:29 2016 -0600
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/02a44c7f/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4a73fcd..956da92 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,6 @@
## 2.0.0
* STORM-1257: port backtype.storm.zookeeper to java
+ * STORM-1504: Add Serializer and instruction for AvroGenericRecordBolt
## 1.0.0
* STORM-1510: Fix broken nimbus log link
[2/4] storm git commit: Update README.md
Posted by do...@apache.org.
Update README.md
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0d3c3477
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0d3c3477
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0d3c3477
Branch: refs/heads/master
Commit: 0d3c3477a199e4305566dee5936328e768614cc7
Parents: e5ca89b
Author: Aaron Niskode-Dossett <aa...@target.com>
Authored: Tue Feb 2 12:04:32 2016 -0600
Committer: Aaron Niskode-Dossett <aa...@target.com>
Committed: Tue Feb 2 12:04:32 2016 -0600
----------------------------------------------------------------------
external/storm-hdfs/README.md | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/0d3c3477/external/storm-hdfs/README.md
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/README.md b/external/storm-hdfs/README.md
index 76a8602..2fc4c7b 100644
--- a/external/storm-hdfs/README.md
+++ b/external/storm-hdfs/README.md
@@ -317,7 +317,7 @@ schema.
To use this bolt you **must** register the appropriate Kryo serializers with your topology configuration. A convenience
method is provided for this:
-```AvroGenericRecordBolt.addAvroKryoSerializations(conf);```
+`AvroGenericRecordBolt.addAvroKryoSerializations(conf);`
By default Storm will use the ```GenericAvroSerializer``` to handle serialization. This will work, but there are much
faster options available if you can pre-define the schemas you will be using or utilize an external schema registry. An
@@ -559,4 +559,4 @@ under the License.
# Committer Sponsors
* P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))
- * Bobby Evans ([bobby@apache.org](mailto:bobby@apache.org))
\ No newline at end of file
+ * Bobby Evans ([bobby@apache.org](mailto:bobby@apache.org))
[3/4] storm git commit: Merge branch 'STORM-1504' of
https://github.com/dossett/storm into STORM-1504
Posted by do...@apache.org.
Merge branch 'STORM-1504' of https://github.com/dossett/storm into STORM-1504
STORM-1504: Add Serializer and instruction for AvroGenericRecordBolt
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0dac59b5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0dac59b5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0dac59b5
Branch: refs/heads/master
Commit: 0dac59b594912570b21b1e5b917ff7b6d1cbc392
Parents: f2dcee6 0d3c347
Author: Aaron Dossett <aa...@target.com>
Authored: Tue Feb 2 12:45:07 2016 -0600
Committer: Aaron Dossett <aa...@target.com>
Committed: Tue Feb 2 12:45:07 2016 -0600
----------------------------------------------------------------------
external/storm-hdfs/README.md | 15 +++-
external/storm-hdfs/pom.xml | 22 ++++++
.../storm/hdfs/avro/AbstractAvroSerializer.java | 80 +++++++++++++++++++
.../storm/hdfs/avro/AvroSchemaRegistry.java | 28 +++++++
.../org/apache/storm/hdfs/avro/AvroUtils.java | 44 +++++++++++
.../hdfs/avro/ConfluentAvroSerializer.java | 83 ++++++++++++++++++++
.../storm/hdfs/avro/FixedAvroSerializer.java | 67 ++++++++++++++++
.../storm/hdfs/avro/GenericAvroSerializer.java | 36 +++++++++
.../storm/hdfs/bolt/AvroGenericRecordBolt.java | 4 -
.../hdfs/avro/TestFixedAvroSerializer.java | 76 ++++++++++++++++++
.../hdfs/avro/TestGenericAvroSerializer.java | 68 ++++++++++++++++
.../test/resources/FixedAvroSerializer.config | 2 +
pom.xml | 3 +
13 files changed, 522 insertions(+), 6 deletions(-)
----------------------------------------------------------------------