You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/30 19:59:00 UTC
[1/9] [FLINK-1117] Clean up flink-avro project: remove deprecated
AvroRecord format, migrate tests to new java api.
Repository: incubator-flink
Updated Branches:
refs/heads/master e7c4c8586 -> 15e599062
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/User.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/User.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/User.java
new file mode 100644
index 0000000..505857e
--- /dev/null
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/User.java
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.flink.api.io.avro.generated;
+@SuppressWarnings("all")
+@org.apache.avro.specific.AvroGenerated
+public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+ public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"org.apache.flink.api.io.avro.generated\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]},{\"name\":\"type_long_test\",\"type\":[\"long\",\"null\"]},{\"name\":\"type_double_test\",\"type\":[\"double\"]},{\"name\":\"type_null_test\",\"type\":[\"null\"]},{\"name\":\"type_bool_test\",\"type\":[\"boolean\"]},{\"name\":\"type_array_string\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"type_array_boolean\",\"type\":{\"type\":\"array\",\"items\":\"boolean\"}},{\"name\":\"type_nullable_array\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"default\":null},{\"name\":\"type_enum\",\"type\":{\"type\":\"enum\",\"name\":\"Colors\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}},{\"name\":\"type
_map\",\"type\":{\"type\":\"map\",\"values\":\"long\"}}]}");
+ public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+ @Deprecated public java.lang.CharSequence name;
+ @Deprecated public java.lang.Integer favorite_number;
+ @Deprecated public java.lang.CharSequence favorite_color;
+ @Deprecated public java.lang.Long type_long_test;
+ @Deprecated public java.lang.Object type_double_test;
+ @Deprecated public java.lang.Object type_null_test;
+ @Deprecated public java.lang.Object type_bool_test;
+ @Deprecated public java.util.List<java.lang.CharSequence> type_array_string;
+ @Deprecated public java.util.List<java.lang.Boolean> type_array_boolean;
+ @Deprecated public java.util.List<java.lang.CharSequence> type_nullable_array;
+ @Deprecated public org.apache.flink.api.io.avro.generated.Colors type_enum;
+ @Deprecated public java.util.Map<java.lang.CharSequence,java.lang.Long> type_map;
+
+ /**
+ * Default constructor. Note that this does not initialize fields
+ * to their default values from the schema. If that is desired then
+ * one should use {@link \#newBuilder()}.
+ */
+ public User() {}
+
+ /**
+ * All-args constructor.
+ */
+ public User(java.lang.CharSequence name, java.lang.Integer favorite_number, java.lang.CharSequence favorite_color, java.lang.Long type_long_test, java.lang.Object type_double_test, java.lang.Object type_null_test, java.lang.Object type_bool_test, java.util.List<java.lang.CharSequence> type_array_string, java.util.List<java.lang.Boolean> type_array_boolean, java.util.List<java.lang.CharSequence> type_nullable_array, org.apache.flink.api.io.avro.generated.Colors type_enum, java.util.Map<java.lang.CharSequence,java.lang.Long> type_map) {
+ this.name = name;
+ this.favorite_number = favorite_number;
+ this.favorite_color = favorite_color;
+ this.type_long_test = type_long_test;
+ this.type_double_test = type_double_test;
+ this.type_null_test = type_null_test;
+ this.type_bool_test = type_bool_test;
+ this.type_array_string = type_array_string;
+ this.type_array_boolean = type_array_boolean;
+ this.type_nullable_array = type_nullable_array;
+ this.type_enum = type_enum;
+ this.type_map = type_map;
+ }
+
+ public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+ // Used by DatumWriter. Applications should not call.
+ public java.lang.Object get(int field$) {
+ switch (field$) {
+ case 0: return name;
+ case 1: return favorite_number;
+ case 2: return favorite_color;
+ case 3: return type_long_test;
+ case 4: return type_double_test;
+ case 5: return type_null_test;
+ case 6: return type_bool_test;
+ case 7: return type_array_string;
+ case 8: return type_array_boolean;
+ case 9: return type_nullable_array;
+ case 10: return type_enum;
+ case 11: return type_map;
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+ // Used by DatumReader. Applications should not call.
+ @SuppressWarnings(value="unchecked")
+ public void put(int field$, java.lang.Object value$) {
+ switch (field$) {
+ case 0: name = (java.lang.CharSequence)value$; break;
+ case 1: favorite_number = (java.lang.Integer)value$; break;
+ case 2: favorite_color = (java.lang.CharSequence)value$; break;
+ case 3: type_long_test = (java.lang.Long)value$; break;
+ case 4: type_double_test = (java.lang.Object)value$; break;
+ case 5: type_null_test = (java.lang.Object)value$; break;
+ case 6: type_bool_test = (java.lang.Object)value$; break;
+ case 7: type_array_string = (java.util.List<java.lang.CharSequence>)value$; break;
+ case 8: type_array_boolean = (java.util.List<java.lang.Boolean>)value$; break;
+ case 9: type_nullable_array = (java.util.List<java.lang.CharSequence>)value$; break;
+ case 10: type_enum = (org.apache.flink.api.io.avro.generated.Colors)value$; break;
+ case 11: type_map = (java.util.Map<java.lang.CharSequence,java.lang.Long>)value$; break;
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+
+ /**
+ * Gets the value of the 'name' field.
+ */
+ public java.lang.CharSequence getName() {
+ return name;
+ }
+
+ /**
+ * Sets the value of the 'name' field.
+ * @param value the value to set.
+ */
+ public void setName(java.lang.CharSequence value) {
+ this.name = value;
+ }
+
+ /**
+ * Gets the value of the 'favorite_number' field.
+ */
+ public java.lang.Integer getFavoriteNumber() {
+ return favorite_number;
+ }
+
+ /**
+ * Sets the value of the 'favorite_number' field.
+ * @param value the value to set.
+ */
+ public void setFavoriteNumber(java.lang.Integer value) {
+ this.favorite_number = value;
+ }
+
+ /**
+ * Gets the value of the 'favorite_color' field.
+ */
+ public java.lang.CharSequence getFavoriteColor() {
+ return favorite_color;
+ }
+
+ /**
+ * Sets the value of the 'favorite_color' field.
+ * @param value the value to set.
+ */
+ public void setFavoriteColor(java.lang.CharSequence value) {
+ this.favorite_color = value;
+ }
+
+ /**
+ * Gets the value of the 'type_long_test' field.
+ */
+ public java.lang.Long getTypeLongTest() {
+ return type_long_test;
+ }
+
+ /**
+ * Sets the value of the 'type_long_test' field.
+ * @param value the value to set.
+ */
+ public void setTypeLongTest(java.lang.Long value) {
+ this.type_long_test = value;
+ }
+
+ /**
+ * Gets the value of the 'type_double_test' field.
+ */
+ public java.lang.Object getTypeDoubleTest() {
+ return type_double_test;
+ }
+
+ /**
+ * Sets the value of the 'type_double_test' field.
+ * @param value the value to set.
+ */
+ public void setTypeDoubleTest(java.lang.Object value) {
+ this.type_double_test = value;
+ }
+
+ /**
+ * Gets the value of the 'type_null_test' field.
+ */
+ public java.lang.Object getTypeNullTest() {
+ return type_null_test;
+ }
+
+ /**
+ * Sets the value of the 'type_null_test' field.
+ * @param value the value to set.
+ */
+ public void setTypeNullTest(java.lang.Object value) {
+ this.type_null_test = value;
+ }
+
+ /**
+ * Gets the value of the 'type_bool_test' field.
+ */
+ public java.lang.Object getTypeBoolTest() {
+ return type_bool_test;
+ }
+
+ /**
+ * Sets the value of the 'type_bool_test' field.
+ * @param value the value to set.
+ */
+ public void setTypeBoolTest(java.lang.Object value) {
+ this.type_bool_test = value;
+ }
+
+ /**
+ * Gets the value of the 'type_array_string' field.
+ */
+ public java.util.List<java.lang.CharSequence> getTypeArrayString() {
+ return type_array_string;
+ }
+
+ /**
+ * Sets the value of the 'type_array_string' field.
+ * @param value the value to set.
+ */
+ public void setTypeArrayString(java.util.List<java.lang.CharSequence> value) {
+ this.type_array_string = value;
+ }
+
+ /**
+ * Gets the value of the 'type_array_boolean' field.
+ */
+ public java.util.List<java.lang.Boolean> getTypeArrayBoolean() {
+ return type_array_boolean;
+ }
+
+ /**
+ * Sets the value of the 'type_array_boolean' field.
+ * @param value the value to set.
+ */
+ public void setTypeArrayBoolean(java.util.List<java.lang.Boolean> value) {
+ this.type_array_boolean = value;
+ }
+
+ /**
+ * Gets the value of the 'type_nullable_array' field.
+ */
+ public java.util.List<java.lang.CharSequence> getTypeNullableArray() {
+ return type_nullable_array;
+ }
+
+ /**
+ * Sets the value of the 'type_nullable_array' field.
+ * @param value the value to set.
+ */
+ public void setTypeNullableArray(java.util.List<java.lang.CharSequence> value) {
+ this.type_nullable_array = value;
+ }
+
+ /**
+ * Gets the value of the 'type_enum' field.
+ */
+ public org.apache.flink.api.io.avro.generated.Colors getTypeEnum() {
+ return type_enum;
+ }
+
+ /**
+ * Sets the value of the 'type_enum' field.
+ * @param value the value to set.
+ */
+ public void setTypeEnum(org.apache.flink.api.io.avro.generated.Colors value) {
+ this.type_enum = value;
+ }
+
+ /**
+ * Gets the value of the 'type_map' field.
+ */
+ public java.util.Map<java.lang.CharSequence,java.lang.Long> getTypeMap() {
+ return type_map;
+ }
+
+ /**
+ * Sets the value of the 'type_map' field.
+ * @param value the value to set.
+ */
+ public void setTypeMap(java.util.Map<java.lang.CharSequence,java.lang.Long> value) {
+ this.type_map = value;
+ }
+
+ /** Creates a new User RecordBuilder */
+ public static org.apache.flink.api.io.avro.generated.User.Builder newBuilder() {
+ return new org.apache.flink.api.io.avro.generated.User.Builder();
+ }
+
+ /** Creates a new User RecordBuilder by copying an existing Builder */
+ public static org.apache.flink.api.io.avro.generated.User.Builder newBuilder(org.apache.flink.api.io.avro.generated.User.Builder other) {
+ return new org.apache.flink.api.io.avro.generated.User.Builder(other);
+ }
+
+ /** Creates a new User RecordBuilder by copying an existing User instance */
+ public static org.apache.flink.api.io.avro.generated.User.Builder newBuilder(org.apache.flink.api.io.avro.generated.User other) {
+ return new org.apache.flink.api.io.avro.generated.User.Builder(other);
+ }
+
+ /**
+ * RecordBuilder for User instances.
+ */
+ public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<User>
+ implements org.apache.avro.data.RecordBuilder<User> {
+
+ private java.lang.CharSequence name;
+ private java.lang.Integer favorite_number;
+ private java.lang.CharSequence favorite_color;
+ private java.lang.Long type_long_test;
+ private java.lang.Object type_double_test;
+ private java.lang.Object type_null_test;
+ private java.lang.Object type_bool_test;
+ private java.util.List<java.lang.CharSequence> type_array_string;
+ private java.util.List<java.lang.Boolean> type_array_boolean;
+ private java.util.List<java.lang.CharSequence> type_nullable_array;
+ private org.apache.flink.api.io.avro.generated.Colors type_enum;
+ private java.util.Map<java.lang.CharSequence,java.lang.Long> type_map;
+
+ /** Creates a new Builder */
+ private Builder() {
+ super(org.apache.flink.api.io.avro.generated.User.SCHEMA$);
+ }
+
+ /** Creates a Builder by copying an existing Builder */
+ private Builder(org.apache.flink.api.io.avro.generated.User.Builder other) {
+ super(other);
+ if (isValidValue(fields()[0], other.name)) {
+ this.name = data().deepCopy(fields()[0].schema(), other.name);
+ fieldSetFlags()[0] = true;
+ }
+ if (isValidValue(fields()[1], other.favorite_number)) {
+ this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
+ fieldSetFlags()[1] = true;
+ }
+ if (isValidValue(fields()[2], other.favorite_color)) {
+ this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
+ fieldSetFlags()[2] = true;
+ }
+ if (isValidValue(fields()[3], other.type_long_test)) {
+ this.type_long_test = data().deepCopy(fields()[3].schema(), other.type_long_test);
+ fieldSetFlags()[3] = true;
+ }
+ if (isValidValue(fields()[4], other.type_double_test)) {
+ this.type_double_test = data().deepCopy(fields()[4].schema(), other.type_double_test);
+ fieldSetFlags()[4] = true;
+ }
+ if (isValidValue(fields()[5], other.type_null_test)) {
+ this.type_null_test = data().deepCopy(fields()[5].schema(), other.type_null_test);
+ fieldSetFlags()[5] = true;
+ }
+ if (isValidValue(fields()[6], other.type_bool_test)) {
+ this.type_bool_test = data().deepCopy(fields()[6].schema(), other.type_bool_test);
+ fieldSetFlags()[6] = true;
+ }
+ if (isValidValue(fields()[7], other.type_array_string)) {
+ this.type_array_string = data().deepCopy(fields()[7].schema(), other.type_array_string);
+ fieldSetFlags()[7] = true;
+ }
+ if (isValidValue(fields()[8], other.type_array_boolean)) {
+ this.type_array_boolean = data().deepCopy(fields()[8].schema(), other.type_array_boolean);
+ fieldSetFlags()[8] = true;
+ }
+ if (isValidValue(fields()[9], other.type_nullable_array)) {
+ this.type_nullable_array = data().deepCopy(fields()[9].schema(), other.type_nullable_array);
+ fieldSetFlags()[9] = true;
+ }
+ if (isValidValue(fields()[10], other.type_enum)) {
+ this.type_enum = data().deepCopy(fields()[10].schema(), other.type_enum);
+ fieldSetFlags()[10] = true;
+ }
+ if (isValidValue(fields()[11], other.type_map)) {
+ this.type_map = data().deepCopy(fields()[11].schema(), other.type_map);
+ fieldSetFlags()[11] = true;
+ }
+ }
+
+ /** Creates a Builder by copying an existing User instance */
+ private Builder(org.apache.flink.api.io.avro.generated.User other) {
+ super(org.apache.flink.api.io.avro.generated.User.SCHEMA$);
+ if (isValidValue(fields()[0], other.name)) {
+ this.name = data().deepCopy(fields()[0].schema(), other.name);
+ fieldSetFlags()[0] = true;
+ }
+ if (isValidValue(fields()[1], other.favorite_number)) {
+ this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
+ fieldSetFlags()[1] = true;
+ }
+ if (isValidValue(fields()[2], other.favorite_color)) {
+ this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
+ fieldSetFlags()[2] = true;
+ }
+ if (isValidValue(fields()[3], other.type_long_test)) {
+ this.type_long_test = data().deepCopy(fields()[3].schema(), other.type_long_test);
+ fieldSetFlags()[3] = true;
+ }
+ if (isValidValue(fields()[4], other.type_double_test)) {
+ this.type_double_test = data().deepCopy(fields()[4].schema(), other.type_double_test);
+ fieldSetFlags()[4] = true;
+ }
+ if (isValidValue(fields()[5], other.type_null_test)) {
+ this.type_null_test = data().deepCopy(fields()[5].schema(), other.type_null_test);
+ fieldSetFlags()[5] = true;
+ }
+ if (isValidValue(fields()[6], other.type_bool_test)) {
+ this.type_bool_test = data().deepCopy(fields()[6].schema(), other.type_bool_test);
+ fieldSetFlags()[6] = true;
+ }
+ if (isValidValue(fields()[7], other.type_array_string)) {
+ this.type_array_string = data().deepCopy(fields()[7].schema(), other.type_array_string);
+ fieldSetFlags()[7] = true;
+ }
+ if (isValidValue(fields()[8], other.type_array_boolean)) {
+ this.type_array_boolean = data().deepCopy(fields()[8].schema(), other.type_array_boolean);
+ fieldSetFlags()[8] = true;
+ }
+ if (isValidValue(fields()[9], other.type_nullable_array)) {
+ this.type_nullable_array = data().deepCopy(fields()[9].schema(), other.type_nullable_array);
+ fieldSetFlags()[9] = true;
+ }
+ if (isValidValue(fields()[10], other.type_enum)) {
+ this.type_enum = data().deepCopy(fields()[10].schema(), other.type_enum);
+ fieldSetFlags()[10] = true;
+ }
+ if (isValidValue(fields()[11], other.type_map)) {
+ this.type_map = data().deepCopy(fields()[11].schema(), other.type_map);
+ fieldSetFlags()[11] = true;
+ }
+ }
+
+ /** Gets the value of the 'name' field */
+ public java.lang.CharSequence getName() {
+ return name;
+ }
+
+ /** Sets the value of the 'name' field */
+ public org.apache.flink.api.io.avro.generated.User.Builder setName(java.lang.CharSequence value) {
+ validate(fields()[0], value);
+ this.name = value;
+ fieldSetFlags()[0] = true;
+ return this;
+ }
+
+ /** Checks whether the 'name' field has been set */
+ public boolean hasName() {
+ return fieldSetFlags()[0];
+ }
+
+ /** Clears the value of the 'name' field */
+ public org.apache.flink.api.io.avro.generated.User.Builder clearName() {
+ name = null;
+ fieldSetFlags()[0] = false;
+ return this;
+ }
+
+ /** Gets the value of the 'favorite_number' field */
+ public java.lang.Integer getFavoriteNumber() {
+ return favorite_number;
+ }
+
+ /** Sets the value of the 'favorite_number' field */
+ public org.apache.flink.api.io.avro.generated.User.Builder setFavoriteNumber(java.lang.Integer value) {
+ validate(fields()[1], value);
+ this.favorite_number = value;
+ fieldSetFlags()[1] = true;
+ return this;
+ }
+
+ /** Checks whether the 'favorite_number' field has been set */
+ public boolean hasFavoriteNumber() {
+ return fieldSetFlags()[1];
+ }
+
+ /** Clears the value of the 'favorite_number' field */
+ public org.apache.flink.api.io.avro.generated.User.Builder clearFavoriteNumber() {
+ favorite_number = null;
+ fieldSetFlags()[1] = false;
+ return this;
+ }
+
+ /** Gets the value of the 'favorite_color' field */
+ public java.lang.CharSequence getFavoriteColor() {
+ return favorite_color;
+ }
+
+ /** Sets the value of the 'favorite_color' field */
+ public org.apache.flink.api.io.avro.generated.User.Builder setFavoriteColor(java.lang.CharSequence value) {
+ validate(fields()[2], value);
+ this.favorite_color = value;
+ fieldSetFlags()[2] = true;
+ return this;
+ }
+
+ /** Checks whether the 'favorite_color' field has been set */
+ public boolean hasFavoriteColor() {
+ return fieldSetFlags()[2];
+ }
+
+ /** Clears the value of the 'favorite_color' field */
+ public org.apache.flink.api.io.avro.generated.User.Builder clearFavoriteColor() {
+ favorite_color = null;
+ fieldSetFlags()[2] = false;
+ return this;
+ }
+
+ /** Gets the value of the 'type_long_test' field */
+ public java.lang.Long getTypeLongTest() {
+ return type_long_test;
+ }
+
+ /** Sets the value of the 'type_long_test' field */
+ public org.apache.flink.api.io.avro.generated.User.Builder setTypeLongTest(java.lang.Long value) {
+ validate(fields()[3], value);
+ this.type_long_test = value;
+ fieldSetFlags()[3] = true;
+ return this;
+ }
+
+ /** Checks whether the 'type_long_test' field has been set */
+ public boolean hasTypeLongTest() {
+ return fieldSetFlags()[3];
+ }
+
+ /** Clears the value of the 'type_long_test' field */
+ public org.apache.flink.api.io.avro.generated.User.Builder clearTypeLongTest() {
+ type_long_test = null;
+ fieldSetFlags()[3] = false;
+ return this;
+ }
+
+ /** Gets the value of the 'type_double_test' field */
+ public java.lang.Object getTypeDoubleTest() {
+ return type_double_test;
+ }
+
+ /** Sets the value of the 'type_double_test' field */
+ public org.apache.flink.api.io.avro.generated.User.Builder setTypeDoubleTest(java.lang.Object value) {
+ validate(fields()[4], value);
+ this.type_double_test = value;
+ fieldSetFlags()[4] = true;
+ return this;
+ }
+
+ /** Checks whether the 'type_double_test' field has been set */
+ public boolean hasTypeDoubleTest() {
+ return fieldSetFlags()[4];
+ }
+
+ /** Clears the value of the 'type_double_test' field */
+ public org.apache.flink.api.io.avro.generated.User.Builder clearTypeDoubleTest() {
+ type_double_test = null;
+ fieldSetFlags()[4] = false;
+ return this;
+ }
+
+ /** Gets the value of the 'type_null_test' field */
+ public java.lang.Object getTypeNullTest() {
+ return type_null_test;
+ }
+
+ /** Sets the value of the 'type_null_test' field */
+ public org.apache.flink.api.io.avro.generated.User.Builder setTypeNullTest(java.lang.Object value) {
+ validate(fields()[5], value);
+ this.type_null_test = value;
+ fieldSetFlags()[5] = true;
+ return this;
+ }
+
+ /** Checks whether the 'type_null_test' field has been set */
+ public boolean hasTypeNullTest() {
+ return fieldSetFlags()[5];
+ }
+
+ /** Clears the value of the 'type_null_test' field */
+ public org.apache.flink.api.io.avro.generated.User.Builder clearTypeNullTest() {
+ type_null_test = null;
+ fieldSetFlags()[5] = false;
+ return this;
+ }
+
+ /** Gets the value of the 'type_bool_test' field */
+ public java.lang.Object getTypeBoolTest() {
+ return type_bool_test;
+ }
+
+ /** Sets the value of the 'type_bool_test' field */
+ public org.apache.flink.api.io.avro.generated.User.Builder setTypeBoolTest(java.lang.Object value) {
+ validate(fields()[6], value);
+ this.type_bool_test = value;
+ fieldSetFlags()[6] = true;
+ return this;
+ }
+
+ /** Checks whether the 'type_bool_test' field has been set */
+ public boolean hasTypeBoolTest() {
+ return fieldSetFlags()[6];
+ }
+
+ /** Clears the value of the 'type_bool_test' field */
+ public org.apache.flink.api.io.avro.generated.User.Builder clearTypeBoolTest() {
+ type_bool_test = null;
+ fieldSetFlags()[6] = false;
+ return this;
+ }
+
+ /** Gets the value of the 'type_array_string' field */
+ public java.util.List<java.lang.CharSequence> getTypeArrayString() {
+ return type_array_string;
+ }
+
+ /** Sets the value of the 'type_array_string' field */
+ public org.apache.flink.api.io.avro.generated.User.Builder setTypeArrayString(java.util.List<java.lang.CharSequence> value) {
+ validate(fields()[7], value);
+ this.type_array_string = value;
+ fieldSetFlags()[7] = true;
+ return this;
+ }
+
+ /** Checks whether the 'type_array_string' field has been set */
+ public boolean hasTypeArrayString() {
+ return fieldSetFlags()[7];
+ }
+
+ /** Clears the value of the 'type_array_string' field */
+ public org.apache.flink.api.io.avro.generated.User.Builder clearTypeArrayString() {
+ type_array_string = null;
+ fieldSetFlags()[7] = false;
+ return this;
+ }
+
+ /** Gets the value of the 'type_array_boolean' field */
+ public java.util.List<java.lang.Boolean> getTypeArrayBoolean() {
+ return type_array_boolean;
+ }
+
+ /** Sets the value of the 'type_array_boolean' field */
+ public org.apache.flink.api.io.avro.generated.User.Builder setTypeArrayBoolean(java.util.List<java.lang.Boolean> value) {
+ validate(fields()[8], value);
+ this.type_array_boolean = value;
+ fieldSetFlags()[8] = true;
+ return this;
+ }
+
+ /** Checks whether the 'type_array_boolean' field has been set */
+ public boolean hasTypeArrayBoolean() {
+ return fieldSetFlags()[8];
+ }
+
+ /** Clears the value of the 'type_array_boolean' field */
+ public org.apache.flink.api.io.avro.generated.User.Builder clearTypeArrayBoolean() {
+ type_array_boolean = null;
+ fieldSetFlags()[8] = false;
+ return this;
+ }
+
+ /** Gets the value of the 'type_nullable_array' field */
+ public java.util.List<java.lang.CharSequence> getTypeNullableArray() {
+ return type_nullable_array;
+ }
+
+ /** Sets the value of the 'type_nullable_array' field */
+ public org.apache.flink.api.io.avro.generated.User.Builder setTypeNullableArray(java.util.List<java.lang.CharSequence> value) {
+ validate(fields()[9], value);
+ this.type_nullable_array = value;
+ fieldSetFlags()[9] = true;
+ return this;
+ }
+
+ /** Checks whether the 'type_nullable_array' field has been set */
+ public boolean hasTypeNullableArray() {
+ return fieldSetFlags()[9];
+ }
+
+ /** Clears the value of the 'type_nullable_array' field */
+ public org.apache.flink.api.io.avro.generated.User.Builder clearTypeNullableArray() {
+ type_nullable_array = null;
+ fieldSetFlags()[9] = false;
+ return this;
+ }
+
+ /** Gets the value of the 'type_enum' field */
+ public org.apache.flink.api.io.avro.generated.Colors getTypeEnum() {
+ return type_enum;
+ }
+
+ /** Sets the value of the 'type_enum' field */
+ public org.apache.flink.api.io.avro.generated.User.Builder setTypeEnum(org.apache.flink.api.io.avro.generated.Colors value) {
+ validate(fields()[10], value);
+ this.type_enum = value;
+ fieldSetFlags()[10] = true;
+ return this;
+ }
+
+ /** Checks whether the 'type_enum' field has been set */
+ public boolean hasTypeEnum() {
+ return fieldSetFlags()[10];
+ }
+
+ /** Clears the value of the 'type_enum' field */
+ public org.apache.flink.api.io.avro.generated.User.Builder clearTypeEnum() {
+ type_enum = null;
+ fieldSetFlags()[10] = false;
+ return this;
+ }
+
+ /** Gets the value of the 'type_map' field */
+ public java.util.Map<java.lang.CharSequence,java.lang.Long> getTypeMap() {
+ return type_map;
+ }
+
+ /** Sets the value of the 'type_map' field */
+ public org.apache.flink.api.io.avro.generated.User.Builder setTypeMap(java.util.Map<java.lang.CharSequence,java.lang.Long> value) {
+ validate(fields()[11], value);
+ this.type_map = value;
+ fieldSetFlags()[11] = true;
+ return this;
+ }
+
+ /** Checks whether the 'type_map' field has been set */
+ public boolean hasTypeMap() {
+ return fieldSetFlags()[11];
+ }
+
+ /** Clears the value of the 'type_map' field */
+ public org.apache.flink.api.io.avro.generated.User.Builder clearTypeMap() {
+ type_map = null;
+ fieldSetFlags()[11] = false;
+ return this;
+ }
+
+ @Override
+ public User build() {
+ try {
+ User record = new User();
+ record.name = fieldSetFlags()[0] ? this.name : (java.lang.CharSequence) defaultValue(fields()[0]);
+ record.favorite_number = fieldSetFlags()[1] ? this.favorite_number : (java.lang.Integer) defaultValue(fields()[1]);
+ record.favorite_color = fieldSetFlags()[2] ? this.favorite_color : (java.lang.CharSequence) defaultValue(fields()[2]);
+ record.type_long_test = fieldSetFlags()[3] ? this.type_long_test : (java.lang.Long) defaultValue(fields()[3]);
+ record.type_double_test = fieldSetFlags()[4] ? this.type_double_test : (java.lang.Object) defaultValue(fields()[4]);
+ record.type_null_test = fieldSetFlags()[5] ? this.type_null_test : (java.lang.Object) defaultValue(fields()[5]);
+ record.type_bool_test = fieldSetFlags()[6] ? this.type_bool_test : (java.lang.Object) defaultValue(fields()[6]);
+ record.type_array_string = fieldSetFlags()[7] ? this.type_array_string : (java.util.List<java.lang.CharSequence>) defaultValue(fields()[7]);
+ record.type_array_boolean = fieldSetFlags()[8] ? this.type_array_boolean : (java.util.List<java.lang.Boolean>) defaultValue(fields()[8]);
+ record.type_nullable_array = fieldSetFlags()[9] ? this.type_nullable_array : (java.util.List<java.lang.CharSequence>) defaultValue(fields()[9]);
+ record.type_enum = fieldSetFlags()[10] ? this.type_enum : (org.apache.flink.api.io.avro.generated.Colors) defaultValue(fields()[10]);
+ record.type_map = fieldSetFlags()[11] ? this.type_map : (java.util.Map<java.lang.CharSequence,java.lang.Long>) defaultValue(fields()[11]);
+ return record;
+ } catch (Exception e) {
+ throw new org.apache.avro.AvroRuntimeException(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormatTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormatTest.java
deleted file mode 100644
index 8e0462f..0000000
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormatTest.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io.avro;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-
-import org.junit.Assert;
-
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.flink.api.java.record.io.avro.AvroRecordInputFormat.BooleanListValue;
-import org.apache.flink.api.java.record.io.avro.AvroRecordInputFormat.LongMapValue;
-import org.apache.flink.api.java.record.io.avro.AvroRecordInputFormat.StringListValue;
-import org.apache.flink.api.java.record.io.avro.generated.Colors;
-import org.apache.flink.api.java.record.io.avro.generated.User;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-
-/**
- * Test the avro input format.
- * (The testcase is mostly the getting started tutorial of avro)
- * http://avro.apache.org/docs/current/gettingstartedjava.html
- */
-public class AvroRecordInputFormatTest {
-
- private File testFile;
-
- private final AvroRecordInputFormat format = new AvroRecordInputFormat();
- final static String TEST_NAME = "Alyssa";
-
- final static String TEST_ARRAY_STRING_1 = "ELEMENT 1";
- final static String TEST_ARRAY_STRING_2 = "ELEMENT 2";
-
- final static boolean TEST_ARRAY_BOOLEAN_1 = true;
- final static boolean TEST_ARRAY_BOOLEAN_2 = false;
-
- final static Colors TEST_ENUM_COLOR = Colors.GREEN;
-
- final static CharSequence TEST_MAP_KEY1 = "KEY 1";
- final static long TEST_MAP_VALUE1 = 8546456L;
- final static CharSequence TEST_MAP_KEY2 = "KEY 2";
- final static long TEST_MAP_VALUE2 = 17554L;
-
- @Before
- public void createFiles() throws IOException {
- testFile = File.createTempFile("AvroInputFormatTest", null);
-
- ArrayList<CharSequence> stringArray = new ArrayList<CharSequence>();
- stringArray.add(TEST_ARRAY_STRING_1);
- stringArray.add(TEST_ARRAY_STRING_2);
-
- ArrayList<Boolean> booleanArray = new ArrayList<Boolean>();
- booleanArray.add(TEST_ARRAY_BOOLEAN_1);
- booleanArray.add(TEST_ARRAY_BOOLEAN_2);
-
- HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, Long>();
- longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1);
- longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2);
-
-
- User user1 = new User();
- user1.setName(TEST_NAME);
- user1.setFavoriteNumber(256);
- user1.setTypeDoubleTest(123.45d);
- user1.setTypeBoolTest(true);
- user1.setTypeArrayString(stringArray);
- user1.setTypeArrayBoolean(booleanArray);
- user1.setTypeEnum(TEST_ENUM_COLOR);
- user1.setTypeMap(longMap);
-
- // Construct via builder
- User user2 = User.newBuilder()
- .setName("Charlie")
- .setFavoriteColor("blue")
- .setFavoriteNumber(null)
- .setTypeBoolTest(false)
- .setTypeDoubleTest(1.337d)
- .setTypeNullTest(null)
- .setTypeLongTest(1337L)
- .setTypeArrayString(new ArrayList<CharSequence>())
- .setTypeArrayBoolean(new ArrayList<Boolean>())
- .setTypeNullableArray(null)
- .setTypeEnum(Colors.RED)
- .setTypeMap(new HashMap<CharSequence, Long>())
- .build();
- DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
- DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
- dataFileWriter.create(user1.getSchema(), testFile);
- dataFileWriter.append(user1);
- dataFileWriter.append(user2);
- dataFileWriter.close();
- }
-
- @Test
- public void testDeserialisation() throws IOException {
- Configuration parameters = new Configuration();
- format.setFilePath(testFile.toURI().toString());
- format.configure(parameters);
- FileInputSplit[] splits = format.createInputSplits(1);
- Assert.assertEquals(splits.length, 1);
- format.open(splits[0]);
- Record record = new Record();
- Assert.assertNotNull(format.nextRecord(record));
- StringValue name = record.getField(0, StringValue.class);
- Assert.assertNotNull("empty record", name);
- Assert.assertEquals("name not equal",name.getValue(), TEST_NAME);
-
- // check arrays
- StringListValue sl = record.getField(7, AvroRecordInputFormat.StringListValue.class);
- Assert.assertEquals("element 0 not equal", sl.get(0).getValue(), TEST_ARRAY_STRING_1);
- Assert.assertEquals("element 1 not equal", sl.get(1).getValue(), TEST_ARRAY_STRING_2);
-
- BooleanListValue bl = record.getField(8, AvroRecordInputFormat.BooleanListValue.class);
- Assert.assertEquals("element 0 not equal", bl.get(0).getValue(), TEST_ARRAY_BOOLEAN_1);
- Assert.assertEquals("element 1 not equal", bl.get(1).getValue(), TEST_ARRAY_BOOLEAN_2);
-
- // check enums
- StringValue enumValue = record.getField(10, StringValue.class);
- Assert.assertEquals("string representation of enum not equal", enumValue.getValue(), TEST_ENUM_COLOR.toString());
-
- // check maps
- LongMapValue lm = record.getField(11, AvroRecordInputFormat.LongMapValue.class);
- Assert.assertEquals("map value of key 1 not equal", lm.get(new StringValue(TEST_MAP_KEY1)).getValue(), TEST_MAP_VALUE1);
- Assert.assertEquals("map value of key 2 not equal", lm.get(new StringValue(TEST_MAP_KEY2)).getValue(), TEST_MAP_VALUE2);
-
-
- Assert.assertFalse("expecting second element", format.reachedEnd());
- Assert.assertNotNull("expecting second element", format.nextRecord(record));
-
- Assert.assertNull(format.nextRecord(record));
- Assert.assertTrue(format.reachedEnd());
-
- format.close();
- }
-
- @After
- public void deleteFiles() {
- testFile.delete();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/Colors.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/Colors.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/Colors.java
deleted file mode 100644
index e5ddd20..0000000
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/Colors.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/**
- * Autogenerated by Avro
- *
- * DO NOT EDIT DIRECTLY
- */
-package org.apache.flink.api.java.record.io.avro.generated;
-@SuppressWarnings("all")
-@org.apache.avro.specific.AvroGenerated
-public enum Colors {
- RED, GREEN, BLUE ;
- public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"enum\",\"name\":\"Colors\",\"namespace\":\"org.apache.flink.api.java.record.io.avro.generated\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}");
- public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/User.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/User.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/User.java
deleted file mode 100644
index 86a486a..0000000
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/User.java
+++ /dev/null
@@ -1,755 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/**
- * Autogenerated by Avro
- *
- * DO NOT EDIT DIRECTLY
- */
-package org.apache.flink.api.java.record.io.avro.generated;
-@SuppressWarnings("all")
-@org.apache.avro.specific.AvroGenerated
-public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
- public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"org.apache.flink.api.java.record.io.avro.generated\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]},{\"name\":\"type_long_test\",\"type\":[\"long\",\"null\"]},{\"name\":\"type_double_test\",\"type\":[\"double\"]},{\"name\":\"type_null_test\",\"type\":[\"null\"]},{\"name\":\"type_bool_test\",\"type\":[\"boolean\"]},{\"name\":\"type_array_string\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"type_array_boolean\",\"type\":{\"type\":\"array\",\"items\":\"boolean\"}},{\"name\":\"type_nullable_array\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"default\":null},{\"name\":\"type_enum\",\"type\":{\"type\":\"enum\",\"name\":\"Colors\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}},{\"n
ame\":\"type_map\",\"type\":{\"type\":\"map\",\"values\":\"long\"}}]}");
- public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
- @Deprecated public java.lang.CharSequence name;
- @Deprecated public java.lang.Integer favorite_number;
- @Deprecated public java.lang.CharSequence favorite_color;
- @Deprecated public java.lang.Long type_long_test;
- @Deprecated public java.lang.Object type_double_test;
- @Deprecated public java.lang.Object type_null_test;
- @Deprecated public java.lang.Object type_bool_test;
- @Deprecated public java.util.List<java.lang.CharSequence> type_array_string;
- @Deprecated public java.util.List<java.lang.Boolean> type_array_boolean;
- @Deprecated public java.util.List<java.lang.CharSequence> type_nullable_array;
- @Deprecated public org.apache.flink.api.java.record.io.avro.generated.Colors type_enum;
- @Deprecated public java.util.Map<java.lang.CharSequence,java.lang.Long> type_map;
-
- /**
- * Default constructor. Note that this does not initialize fields
- * to their default values from the schema. If that is desired then
- * one should use {@link \#newBuilder()}.
- */
- public User() {}
-
- /**
- * All-args constructor.
- */
- public User(java.lang.CharSequence name, java.lang.Integer favorite_number, java.lang.CharSequence favorite_color, java.lang.Long type_long_test, java.lang.Object type_double_test, java.lang.Object type_null_test, java.lang.Object type_bool_test, java.util.List<java.lang.CharSequence> type_array_string, java.util.List<java.lang.Boolean> type_array_boolean, java.util.List<java.lang.CharSequence> type_nullable_array, org.apache.flink.api.java.record.io.avro.generated.Colors type_enum, java.util.Map<java.lang.CharSequence,java.lang.Long> type_map) {
- this.name = name;
- this.favorite_number = favorite_number;
- this.favorite_color = favorite_color;
- this.type_long_test = type_long_test;
- this.type_double_test = type_double_test;
- this.type_null_test = type_null_test;
- this.type_bool_test = type_bool_test;
- this.type_array_string = type_array_string;
- this.type_array_boolean = type_array_boolean;
- this.type_nullable_array = type_nullable_array;
- this.type_enum = type_enum;
- this.type_map = type_map;
- }
-
- public org.apache.avro.Schema getSchema() { return SCHEMA$; }
- // Used by DatumWriter. Applications should not call.
- public java.lang.Object get(int field$) {
- switch (field$) {
- case 0: return name;
- case 1: return favorite_number;
- case 2: return favorite_color;
- case 3: return type_long_test;
- case 4: return type_double_test;
- case 5: return type_null_test;
- case 6: return type_bool_test;
- case 7: return type_array_string;
- case 8: return type_array_boolean;
- case 9: return type_nullable_array;
- case 10: return type_enum;
- case 11: return type_map;
- default: throw new org.apache.avro.AvroRuntimeException("Bad index");
- }
- }
- // Used by DatumReader. Applications should not call.
- @SuppressWarnings(value="unchecked")
- public void put(int field$, java.lang.Object value$) {
- switch (field$) {
- case 0: name = (java.lang.CharSequence)value$; break;
- case 1: favorite_number = (java.lang.Integer)value$; break;
- case 2: favorite_color = (java.lang.CharSequence)value$; break;
- case 3: type_long_test = (java.lang.Long)value$; break;
- case 4: type_double_test = (java.lang.Object)value$; break;
- case 5: type_null_test = (java.lang.Object)value$; break;
- case 6: type_bool_test = (java.lang.Object)value$; break;
- case 7: type_array_string = (java.util.List<java.lang.CharSequence>)value$; break;
- case 8: type_array_boolean = (java.util.List<java.lang.Boolean>)value$; break;
- case 9: type_nullable_array = (java.util.List<java.lang.CharSequence>)value$; break;
- case 10: type_enum = (org.apache.flink.api.java.record.io.avro.generated.Colors)value$; break;
- case 11: type_map = (java.util.Map<java.lang.CharSequence,java.lang.Long>)value$; break;
- default: throw new org.apache.avro.AvroRuntimeException("Bad index");
- }
- }
-
- /**
- * Gets the value of the 'name' field.
- */
- public java.lang.CharSequence getName() {
- return name;
- }
-
- /**
- * Sets the value of the 'name' field.
- * @param value the value to set.
- */
- public void setName(java.lang.CharSequence value) {
- this.name = value;
- }
-
- /**
- * Gets the value of the 'favorite_number' field.
- */
- public java.lang.Integer getFavoriteNumber() {
- return favorite_number;
- }
-
- /**
- * Sets the value of the 'favorite_number' field.
- * @param value the value to set.
- */
- public void setFavoriteNumber(java.lang.Integer value) {
- this.favorite_number = value;
- }
-
- /**
- * Gets the value of the 'favorite_color' field.
- */
- public java.lang.CharSequence getFavoriteColor() {
- return favorite_color;
- }
-
- /**
- * Sets the value of the 'favorite_color' field.
- * @param value the value to set.
- */
- public void setFavoriteColor(java.lang.CharSequence value) {
- this.favorite_color = value;
- }
-
- /**
- * Gets the value of the 'type_long_test' field.
- */
- public java.lang.Long getTypeLongTest() {
- return type_long_test;
- }
-
- /**
- * Sets the value of the 'type_long_test' field.
- * @param value the value to set.
- */
- public void setTypeLongTest(java.lang.Long value) {
- this.type_long_test = value;
- }
-
- /**
- * Gets the value of the 'type_double_test' field.
- */
- public java.lang.Object getTypeDoubleTest() {
- return type_double_test;
- }
-
- /**
- * Sets the value of the 'type_double_test' field.
- * @param value the value to set.
- */
- public void setTypeDoubleTest(java.lang.Object value) {
- this.type_double_test = value;
- }
-
- /**
- * Gets the value of the 'type_null_test' field.
- */
- public java.lang.Object getTypeNullTest() {
- return type_null_test;
- }
-
- /**
- * Sets the value of the 'type_null_test' field.
- * @param value the value to set.
- */
- public void setTypeNullTest(java.lang.Object value) {
- this.type_null_test = value;
- }
-
- /**
- * Gets the value of the 'type_bool_test' field.
- */
- public java.lang.Object getTypeBoolTest() {
- return type_bool_test;
- }
-
- /**
- * Sets the value of the 'type_bool_test' field.
- * @param value the value to set.
- */
- public void setTypeBoolTest(java.lang.Object value) {
- this.type_bool_test = value;
- }
-
- /**
- * Gets the value of the 'type_array_string' field.
- */
- public java.util.List<java.lang.CharSequence> getTypeArrayString() {
- return type_array_string;
- }
-
- /**
- * Sets the value of the 'type_array_string' field.
- * @param value the value to set.
- */
- public void setTypeArrayString(java.util.List<java.lang.CharSequence> value) {
- this.type_array_string = value;
- }
-
- /**
- * Gets the value of the 'type_array_boolean' field.
- */
- public java.util.List<java.lang.Boolean> getTypeArrayBoolean() {
- return type_array_boolean;
- }
-
- /**
- * Sets the value of the 'type_array_boolean' field.
- * @param value the value to set.
- */
- public void setTypeArrayBoolean(java.util.List<java.lang.Boolean> value) {
- this.type_array_boolean = value;
- }
-
- /**
- * Gets the value of the 'type_nullable_array' field.
- */
- public java.util.List<java.lang.CharSequence> getTypeNullableArray() {
- return type_nullable_array;
- }
-
- /**
- * Sets the value of the 'type_nullable_array' field.
- * @param value the value to set.
- */
- public void setTypeNullableArray(java.util.List<java.lang.CharSequence> value) {
- this.type_nullable_array = value;
- }
-
- /**
- * Gets the value of the 'type_enum' field.
- */
- public org.apache.flink.api.java.record.io.avro.generated.Colors getTypeEnum() {
- return type_enum;
- }
-
- /**
- * Sets the value of the 'type_enum' field.
- * @param value the value to set.
- */
- public void setTypeEnum(org.apache.flink.api.java.record.io.avro.generated.Colors value) {
- this.type_enum = value;
- }
-
- /**
- * Gets the value of the 'type_map' field.
- */
- public java.util.Map<java.lang.CharSequence,java.lang.Long> getTypeMap() {
- return type_map;
- }
-
- /**
- * Sets the value of the 'type_map' field.
- * @param value the value to set.
- */
- public void setTypeMap(java.util.Map<java.lang.CharSequence,java.lang.Long> value) {
- this.type_map = value;
- }
-
- /** Creates a new User RecordBuilder */
- public static org.apache.flink.api.java.record.io.avro.generated.User.Builder newBuilder() {
- return new org.apache.flink.api.java.record.io.avro.generated.User.Builder();
- }
-
- /** Creates a new User RecordBuilder by copying an existing Builder */
- public static org.apache.flink.api.java.record.io.avro.generated.User.Builder newBuilder(org.apache.flink.api.java.record.io.avro.generated.User.Builder other) {
- return new org.apache.flink.api.java.record.io.avro.generated.User.Builder(other);
- }
-
- /** Creates a new User RecordBuilder by copying an existing User instance */
- public static org.apache.flink.api.java.record.io.avro.generated.User.Builder newBuilder(org.apache.flink.api.java.record.io.avro.generated.User other) {
- return new org.apache.flink.api.java.record.io.avro.generated.User.Builder(other);
- }
-
- /**
- * RecordBuilder for User instances.
- */
- public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<User>
- implements org.apache.avro.data.RecordBuilder<User> {
-
- private java.lang.CharSequence name;
- private java.lang.Integer favorite_number;
- private java.lang.CharSequence favorite_color;
- private java.lang.Long type_long_test;
- private java.lang.Object type_double_test;
- private java.lang.Object type_null_test;
- private java.lang.Object type_bool_test;
- private java.util.List<java.lang.CharSequence> type_array_string;
- private java.util.List<java.lang.Boolean> type_array_boolean;
- private java.util.List<java.lang.CharSequence> type_nullable_array;
- private org.apache.flink.api.java.record.io.avro.generated.Colors type_enum;
- private java.util.Map<java.lang.CharSequence,java.lang.Long> type_map;
-
- /** Creates a new Builder */
- private Builder() {
- super(org.apache.flink.api.java.record.io.avro.generated.User.SCHEMA$);
- }
-
- /** Creates a Builder by copying an existing Builder */
- private Builder(org.apache.flink.api.java.record.io.avro.generated.User.Builder other) {
- super(other);
- if (isValidValue(fields()[0], other.name)) {
- this.name = data().deepCopy(fields()[0].schema(), other.name);
- fieldSetFlags()[0] = true;
- }
- if (isValidValue(fields()[1], other.favorite_number)) {
- this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
- fieldSetFlags()[1] = true;
- }
- if (isValidValue(fields()[2], other.favorite_color)) {
- this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
- fieldSetFlags()[2] = true;
- }
- if (isValidValue(fields()[3], other.type_long_test)) {
- this.type_long_test = data().deepCopy(fields()[3].schema(), other.type_long_test);
- fieldSetFlags()[3] = true;
- }
- if (isValidValue(fields()[4], other.type_double_test)) {
- this.type_double_test = data().deepCopy(fields()[4].schema(), other.type_double_test);
- fieldSetFlags()[4] = true;
- }
- if (isValidValue(fields()[5], other.type_null_test)) {
- this.type_null_test = data().deepCopy(fields()[5].schema(), other.type_null_test);
- fieldSetFlags()[5] = true;
- }
- if (isValidValue(fields()[6], other.type_bool_test)) {
- this.type_bool_test = data().deepCopy(fields()[6].schema(), other.type_bool_test);
- fieldSetFlags()[6] = true;
- }
- if (isValidValue(fields()[7], other.type_array_string)) {
- this.type_array_string = data().deepCopy(fields()[7].schema(), other.type_array_string);
- fieldSetFlags()[7] = true;
- }
- if (isValidValue(fields()[8], other.type_array_boolean)) {
- this.type_array_boolean = data().deepCopy(fields()[8].schema(), other.type_array_boolean);
- fieldSetFlags()[8] = true;
- }
- if (isValidValue(fields()[9], other.type_nullable_array)) {
- this.type_nullable_array = data().deepCopy(fields()[9].schema(), other.type_nullable_array);
- fieldSetFlags()[9] = true;
- }
- if (isValidValue(fields()[10], other.type_enum)) {
- this.type_enum = data().deepCopy(fields()[10].schema(), other.type_enum);
- fieldSetFlags()[10] = true;
- }
- if (isValidValue(fields()[11], other.type_map)) {
- this.type_map = data().deepCopy(fields()[11].schema(), other.type_map);
- fieldSetFlags()[11] = true;
- }
- }
-
- /** Creates a Builder by copying an existing User instance */
- private Builder(org.apache.flink.api.java.record.io.avro.generated.User other) {
- super(org.apache.flink.api.java.record.io.avro.generated.User.SCHEMA$);
- if (isValidValue(fields()[0], other.name)) {
- this.name = data().deepCopy(fields()[0].schema(), other.name);
- fieldSetFlags()[0] = true;
- }
- if (isValidValue(fields()[1], other.favorite_number)) {
- this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
- fieldSetFlags()[1] = true;
- }
- if (isValidValue(fields()[2], other.favorite_color)) {
- this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
- fieldSetFlags()[2] = true;
- }
- if (isValidValue(fields()[3], other.type_long_test)) {
- this.type_long_test = data().deepCopy(fields()[3].schema(), other.type_long_test);
- fieldSetFlags()[3] = true;
- }
- if (isValidValue(fields()[4], other.type_double_test)) {
- this.type_double_test = data().deepCopy(fields()[4].schema(), other.type_double_test);
- fieldSetFlags()[4] = true;
- }
- if (isValidValue(fields()[5], other.type_null_test)) {
- this.type_null_test = data().deepCopy(fields()[5].schema(), other.type_null_test);
- fieldSetFlags()[5] = true;
- }
- if (isValidValue(fields()[6], other.type_bool_test)) {
- this.type_bool_test = data().deepCopy(fields()[6].schema(), other.type_bool_test);
- fieldSetFlags()[6] = true;
- }
- if (isValidValue(fields()[7], other.type_array_string)) {
- this.type_array_string = data().deepCopy(fields()[7].schema(), other.type_array_string);
- fieldSetFlags()[7] = true;
- }
- if (isValidValue(fields()[8], other.type_array_boolean)) {
- this.type_array_boolean = data().deepCopy(fields()[8].schema(), other.type_array_boolean);
- fieldSetFlags()[8] = true;
- }
- if (isValidValue(fields()[9], other.type_nullable_array)) {
- this.type_nullable_array = data().deepCopy(fields()[9].schema(), other.type_nullable_array);
- fieldSetFlags()[9] = true;
- }
- if (isValidValue(fields()[10], other.type_enum)) {
- this.type_enum = data().deepCopy(fields()[10].schema(), other.type_enum);
- fieldSetFlags()[10] = true;
- }
- if (isValidValue(fields()[11], other.type_map)) {
- this.type_map = data().deepCopy(fields()[11].schema(), other.type_map);
- fieldSetFlags()[11] = true;
- }
- }
-
- /** Gets the value of the 'name' field */
- public java.lang.CharSequence getName() {
- return name;
- }
-
- /** Sets the value of the 'name' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder setName(java.lang.CharSequence value) {
- validate(fields()[0], value);
- this.name = value;
- fieldSetFlags()[0] = true;
- return this;
- }
-
- /** Checks whether the 'name' field has been set */
- public boolean hasName() {
- return fieldSetFlags()[0];
- }
-
- /** Clears the value of the 'name' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearName() {
- name = null;
- fieldSetFlags()[0] = false;
- return this;
- }
-
- /** Gets the value of the 'favorite_number' field */
- public java.lang.Integer getFavoriteNumber() {
- return favorite_number;
- }
-
- /** Sets the value of the 'favorite_number' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder setFavoriteNumber(java.lang.Integer value) {
- validate(fields()[1], value);
- this.favorite_number = value;
- fieldSetFlags()[1] = true;
- return this;
- }
-
- /** Checks whether the 'favorite_number' field has been set */
- public boolean hasFavoriteNumber() {
- return fieldSetFlags()[1];
- }
-
- /** Clears the value of the 'favorite_number' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearFavoriteNumber() {
- favorite_number = null;
- fieldSetFlags()[1] = false;
- return this;
- }
-
- /** Gets the value of the 'favorite_color' field */
- public java.lang.CharSequence getFavoriteColor() {
- return favorite_color;
- }
-
- /** Sets the value of the 'favorite_color' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder setFavoriteColor(java.lang.CharSequence value) {
- validate(fields()[2], value);
- this.favorite_color = value;
- fieldSetFlags()[2] = true;
- return this;
- }
-
- /** Checks whether the 'favorite_color' field has been set */
- public boolean hasFavoriteColor() {
- return fieldSetFlags()[2];
- }
-
- /** Clears the value of the 'favorite_color' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearFavoriteColor() {
- favorite_color = null;
- fieldSetFlags()[2] = false;
- return this;
- }
-
- /** Gets the value of the 'type_long_test' field */
- public java.lang.Long getTypeLongTest() {
- return type_long_test;
- }
-
- /** Sets the value of the 'type_long_test' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeLongTest(java.lang.Long value) {
- validate(fields()[3], value);
- this.type_long_test = value;
- fieldSetFlags()[3] = true;
- return this;
- }
-
- /** Checks whether the 'type_long_test' field has been set */
- public boolean hasTypeLongTest() {
- return fieldSetFlags()[3];
- }
-
- /** Clears the value of the 'type_long_test' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeLongTest() {
- type_long_test = null;
- fieldSetFlags()[3] = false;
- return this;
- }
-
- /** Gets the value of the 'type_double_test' field */
- public java.lang.Object getTypeDoubleTest() {
- return type_double_test;
- }
-
- /** Sets the value of the 'type_double_test' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeDoubleTest(java.lang.Object value) {
- validate(fields()[4], value);
- this.type_double_test = value;
- fieldSetFlags()[4] = true;
- return this;
- }
-
- /** Checks whether the 'type_double_test' field has been set */
- public boolean hasTypeDoubleTest() {
- return fieldSetFlags()[4];
- }
-
- /** Clears the value of the 'type_double_test' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeDoubleTest() {
- type_double_test = null;
- fieldSetFlags()[4] = false;
- return this;
- }
-
- /** Gets the value of the 'type_null_test' field */
- public java.lang.Object getTypeNullTest() {
- return type_null_test;
- }
-
- /** Sets the value of the 'type_null_test' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeNullTest(java.lang.Object value) {
- validate(fields()[5], value);
- this.type_null_test = value;
- fieldSetFlags()[5] = true;
- return this;
- }
-
- /** Checks whether the 'type_null_test' field has been set */
- public boolean hasTypeNullTest() {
- return fieldSetFlags()[5];
- }
-
- /** Clears the value of the 'type_null_test' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeNullTest() {
- type_null_test = null;
- fieldSetFlags()[5] = false;
- return this;
- }
-
- /** Gets the value of the 'type_bool_test' field */
- public java.lang.Object getTypeBoolTest() {
- return type_bool_test;
- }
-
- /** Sets the value of the 'type_bool_test' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeBoolTest(java.lang.Object value) {
- validate(fields()[6], value);
- this.type_bool_test = value;
- fieldSetFlags()[6] = true;
- return this;
- }
-
- /** Checks whether the 'type_bool_test' field has been set */
- public boolean hasTypeBoolTest() {
- return fieldSetFlags()[6];
- }
-
- /** Clears the value of the 'type_bool_test' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeBoolTest() {
- type_bool_test = null;
- fieldSetFlags()[6] = false;
- return this;
- }
-
- /** Gets the value of the 'type_array_string' field */
- public java.util.List<java.lang.CharSequence> getTypeArrayString() {
- return type_array_string;
- }
-
- /** Sets the value of the 'type_array_string' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeArrayString(java.util.List<java.lang.CharSequence> value) {
- validate(fields()[7], value);
- this.type_array_string = value;
- fieldSetFlags()[7] = true;
- return this;
- }
-
- /** Checks whether the 'type_array_string' field has been set */
- public boolean hasTypeArrayString() {
- return fieldSetFlags()[7];
- }
-
- /** Clears the value of the 'type_array_string' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeArrayString() {
- type_array_string = null;
- fieldSetFlags()[7] = false;
- return this;
- }
-
- /** Gets the value of the 'type_array_boolean' field */
- public java.util.List<java.lang.Boolean> getTypeArrayBoolean() {
- return type_array_boolean;
- }
-
- /** Sets the value of the 'type_array_boolean' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeArrayBoolean(java.util.List<java.lang.Boolean> value) {
- validate(fields()[8], value);
- this.type_array_boolean = value;
- fieldSetFlags()[8] = true;
- return this;
- }
-
- /** Checks whether the 'type_array_boolean' field has been set */
- public boolean hasTypeArrayBoolean() {
- return fieldSetFlags()[8];
- }
-
- /** Clears the value of the 'type_array_boolean' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeArrayBoolean() {
- type_array_boolean = null;
- fieldSetFlags()[8] = false;
- return this;
- }
-
- /** Gets the value of the 'type_nullable_array' field */
- public java.util.List<java.lang.CharSequence> getTypeNullableArray() {
- return type_nullable_array;
- }
-
- /** Sets the value of the 'type_nullable_array' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeNullableArray(java.util.List<java.lang.CharSequence> value) {
- validate(fields()[9], value);
- this.type_nullable_array = value;
- fieldSetFlags()[9] = true;
- return this;
- }
-
- /** Checks whether the 'type_nullable_array' field has been set */
- public boolean hasTypeNullableArray() {
- return fieldSetFlags()[9];
- }
-
- /** Clears the value of the 'type_nullable_array' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeNullableArray() {
- type_nullable_array = null;
- fieldSetFlags()[9] = false;
- return this;
- }
-
- /** Gets the value of the 'type_enum' field */
- public org.apache.flink.api.java.record.io.avro.generated.Colors getTypeEnum() {
- return type_enum;
- }
-
- /** Sets the value of the 'type_enum' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeEnum(org.apache.flink.api.java.record.io.avro.generated.Colors value) {
- validate(fields()[10], value);
- this.type_enum = value;
- fieldSetFlags()[10] = true;
- return this;
- }
-
- /** Checks whether the 'type_enum' field has been set */
- public boolean hasTypeEnum() {
- return fieldSetFlags()[10];
- }
-
- /** Clears the value of the 'type_enum' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeEnum() {
- type_enum = null;
- fieldSetFlags()[10] = false;
- return this;
- }
-
- /** Gets the value of the 'type_map' field */
- public java.util.Map<java.lang.CharSequence,java.lang.Long> getTypeMap() {
- return type_map;
- }
-
- /** Sets the value of the 'type_map' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeMap(java.util.Map<java.lang.CharSequence,java.lang.Long> value) {
- validate(fields()[11], value);
- this.type_map = value;
- fieldSetFlags()[11] = true;
- return this;
- }
-
- /** Checks whether the 'type_map' field has been set */
- public boolean hasTypeMap() {
- return fieldSetFlags()[11];
- }
-
- /** Clears the value of the 'type_map' field */
- public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeMap() {
- type_map = null;
- fieldSetFlags()[11] = false;
- return this;
- }
-
- @Override
- public User build() {
- try {
- User record = new User();
- record.name = fieldSetFlags()[0] ? this.name : (java.lang.CharSequence) defaultValue(fields()[0]);
- record.favorite_number = fieldSetFlags()[1] ? this.favorite_number : (java.lang.Integer) defaultValue(fields()[1]);
- record.favorite_color = fieldSetFlags()[2] ? this.favorite_color : (java.lang.CharSequence) defaultValue(fields()[2]);
- record.type_long_test = fieldSetFlags()[3] ? this.type_long_test : (java.lang.Long) defaultValue(fields()[3]);
- record.type_double_test = fieldSetFlags()[4] ? this.type_double_test : (java.lang.Object) defaultValue(fields()[4]);
- record.type_null_test = fieldSetFlags()[5] ? this.type_null_test : (java.lang.Object) defaultValue(fields()[5]);
- record.type_bool_test = fieldSetFlags()[6] ? this.type_bool_test : (java.lang.Object) defaultValue(fields()[6]);
- record.type_array_string = fieldSetFlags()[7] ? this.type_array_string : (java.util.List<java.lang.CharSequence>) defaultValue(fields()[7]);
- record.type_array_boolean = fieldSetFlags()[8] ? this.type_array_boolean : (java.util.List<java.lang.Boolean>) defaultValue(fields()[8]);
- record.type_nullable_array = fieldSetFlags()[9] ? this.type_nullable_array : (java.util.List<java.lang.CharSequence>) defaultValue(fields()[9]);
- record.type_enum = fieldSetFlags()[10] ? this.type_enum : (org.apache.flink.api.java.record.io.avro.generated.Colors) defaultValue(fields()[10]);
- record.type_map = fieldSetFlags()[11] ? this.type_map : (java.util.Map<java.lang.CharSequence,java.lang.Long>) defaultValue(fields()[11]);
- return record;
- } catch (Exception e) {
- throw new org.apache.avro.AvroRuntimeException(e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java
new file mode 100644
index 0000000..8a89410
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.avro.reflect.Nullable;
+import org.apache.flink.api.common.typeutils.SerializerTestInstance;
+import org.junit.Test;
+
+public class AvroSerializerEmptyArrayTest {
+
+ @Test
+ public void testBookSerialization() {
+ try {
+ Book b = new Book(123, "This is a test book", 26382648);
+ AvroSerializer<Book> serializer = new AvroSerializer<Book>(Book.class);
+ SerializerTestInstance<Book> test = new SerializerTestInstance<Book>(serializer, Book.class, -1, b);
+ test.testAll();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSerialization() {
+ try {
+ List<String> titles = new ArrayList<String>();
+
+ List<Book> books = new ArrayList<Book>();
+ books.add(new Book(123, "This is a test book", 1));
+ books.add(new Book(24234234, "This is a test book", 1));
+ books.add(new Book(1234324, "This is a test book", 3));
+
+ BookAuthor a = new BookAuthor(1, titles, "Test Author");
+ a.books = books;
+ a.bookType = BookAuthor.BookType.journal;
+
+ AvroSerializer<BookAuthor> serializer = new AvroSerializer<BookAuthor>(BookAuthor.class);
+
+ SerializerTestInstance<BookAuthor> test = new SerializerTestInstance<BookAuthor>(serializer, BookAuthor.class, -1, a);
+ test.testAll();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ public static class Book {
+
+ long bookId;
+ @Nullable
+ String title;
+ long authorId;
+
+ public Book() {}
+
+ public Book(long bookId, String title, long authorId) {
+ this.bookId = bookId;
+ this.title = title;
+ this.authorId = authorId;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (int) (authorId ^ (authorId >>> 32));
+ result = prime * result + (int) (bookId ^ (bookId >>> 32));
+ result = prime * result + ((title == null) ? 0 : title.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ Book other = (Book) obj;
+ if (authorId != other.authorId)
+ return false;
+ if (bookId != other.bookId)
+ return false;
+ if (title == null) {
+ if (other.title != null)
+ return false;
+ } else if (!title.equals(other.title))
+ return false;
+ return true;
+ }
+ }
+
+ public static class BookAuthor {
+
+ enum BookType {
+ book,
+ article,
+ journal
+ }
+
+ long authorId;
+
+ @Nullable
+ List<String> bookTitles;
+
+ @Nullable
+ List<Book> books;
+
+ String authorName;
+
+ BookType bookType;
+
+ public BookAuthor() {}
+
+ public BookAuthor(long authorId, List<String> bookTitles, String authorName) {
+ this.authorId = authorId;
+ this.bookTitles = bookTitles;
+ this.authorName = authorName;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (int) (authorId ^ (authorId >>> 32));
+ result = prime * result + ((authorName == null) ? 0 : authorName.hashCode());
+ result = prime * result + ((bookTitles == null) ? 0 : bookTitles.hashCode());
+ result = prime * result + ((bookType == null) ? 0 : bookType.hashCode());
+ result = prime * result + ((books == null) ? 0 : books.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ BookAuthor other = (BookAuthor) obj;
+ if (authorId != other.authorId)
+ return false;
+ if (authorName == null) {
+ if (other.authorName != null)
+ return false;
+ } else if (!authorName.equals(other.authorName))
+ return false;
+ if (bookTitles == null) {
+ if (other.bookTitles != null)
+ return false;
+ } else if (!bookTitles.equals(other.bookTitles))
+ return false;
+ if (bookType != other.bookType)
+ return false;
+ if (books == null) {
+ if (other.books != null)
+ return false;
+ } else if (!books.equals(other.books))
+ return false;
+ return true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/tools/maven/suppressions.xml
----------------------------------------------------------------------
diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml
index 0565b5a..377cbfd 100644
--- a/tools/maven/suppressions.xml
+++ b/tools/maven/suppressions.xml
@@ -23,5 +23,5 @@ under the License.
"http://www.puppycrawl.com/dtds/suppressions_1_1.dtd">
<suppressions>
- <suppress files="org[\\/]apache[\\/]flink[\\/]api[\\/]java[\\/]record[\\/]io[\\/]avro[\\/]example[\\/]User.java" checks="[a-zA-Z0-9]*"/>
+ <suppress files="org[\\/]apache[\\/]flink[\\/]api[\\/]io[\\/]avro[\\/]example[\\/]User.java" checks="[a-zA-Z0-9]*"/>
</suppressions>
\ No newline at end of file
[3/9] git commit: add shading for guava
Posted by se...@apache.org.
add shading for guava
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/7e43b1c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/7e43b1c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/7e43b1c7
Branch: refs/heads/master
Commit: 7e43b1c70429f901b73b0221b3a18b709365f8df
Parents: 30e84ff
Author: Robert Metzger <rm...@apache.org>
Authored: Sun Sep 28 20:54:23 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 30 18:39:58 2014 +0200
----------------------------------------------------------------------
pom.xml | 27 +++++++++++++++++++++++++++
1 file changed, 27 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7e43b1c7/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index bd3fd69..99980ea 100644
--- a/pom.xml
+++ b/pom.xml
@@ -424,6 +424,33 @@ under the License.
<build>
<plugins>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.3</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <createDependencyReducedPom>false</createDependencyReducedPom>
+ <artifactSet>
+ <includes>
+ <include>com.google.guava:guava</include>
+ </includes>
+ </artifactSet>
+ <relocations>
+ <relocation>
+ <pattern>com.google</pattern>
+ <shadedPattern>org.apache.flink.shaded.com.google</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </plugin>
+ <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version><!--$NO-MVN-MAN-VER$-->
[7/9] git commit: RAT plugin not inhertited,
to prevent redundant checks.
Posted by se...@apache.org.
RAT plugin not inhertited, to prevent redundant checks.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/6af80fb4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/6af80fb4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/6af80fb4
Branch: refs/heads/master
Commit: 6af80fb4720c74f7af3acbdf45dd7afa7d039717
Parents: e7c4c85
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Sep 29 12:25:30 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 30 18:39:58 2014 +0200
----------------------------------------------------------------------
pom.xml | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6af80fb4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 624d9b8..bd3fd69 100644
--- a/pom.xml
+++ b/pom.xml
@@ -440,6 +440,7 @@ under the License.
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<version>0.10</version><!--$NO-MVN-MAN-VER$-->
+ <inherited>false</inherited>
<executions>
<execution>
<phase>verify</phase>
[4/9] git commit: [FLINK-1072] improved build matrix
Posted by se...@apache.org.
[FLINK-1072] improved build matrix
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/30e84ff7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/30e84ff7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/30e84ff7
Branch: refs/heads/master
Commit: 30e84ff7542e2915ebf6374ccb34a615723f18a9
Parents: 6af80fb
Author: Robert Metzger <rm...@apache.org>
Authored: Sun Sep 28 20:10:07 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 30 18:39:58 2014 +0200
----------------------------------------------------------------------
.travis.yml | 23 ++++++++++++++++-------
1 file changed, 16 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/30e84ff7/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 0b776c1..3021c94 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,10 +1,22 @@
# s3 deployment based on http://about.travis-ci.org/blog/2012-12-18-travis-artifacts/
language: java
-jdk:
- - oraclejdk8
- - oraclejdk7
- - openjdk6
+
+
+#See https://issues.apache.org/jira/browse/FLINK-1072
+matrix:
+ include:
+ - jdk: "openjdk6"
+ env: PROFILE=
+ - jdk: "oraclejdk7"
+ env: PROFILE=
+ - jdk: "openjdk6"
+ env: PROFILE="-P!include-yarn -Dhadoop.profile=2 -Dhadoop.version=2.0.0-alpha"
+ - jdk: "oraclejdk7"
+ env: PROFILE="-Dhadoop.profile=2 -Dhadoop.version=2.2.0"
+ - jdk: "oraclejdk8"
+ env: PROFILE="-Dhadoop.profile=2 -Dhadoop.version=2.5.0"
+
git:
depth: 10
@@ -27,9 +39,6 @@ env:
- secure: "SNZkMm++fvPbjdreibc4j/XTKy7rOvGvjbvJJLQ01fVDR8ur1FGB5L/CE9tm2Aye75G8br+tJ+gf5cMX8CHL+0VrvuTk5U6flbuM08Pd0pmP64ZncmGfRFKC5qTvt24YR0u3cqxWze8RTkdduz0t8xrEdyCtb94CHs1+RNS+0HA="
# javadocs deploy
- secure: "f4wACodae0NCaIZtmg/JvP40G/3dE7O6+ONDMbq8P/9dnT5peRB1q7+KZxggRmFSWTMPdPIDI7YxI/rM5wZ3pUN2eX0BpjfANT58OJdgCNXN3Hr9YOa8UykD2h6b+AHpy4G89MG65m73RvNXngHTCQ8SBIfewzeMAhHdD3Fk0u8="
- matrix:
- - PROFILE=
- - PROFILE="-Dhadoop.profile=2"
before_script:
- "gem install --no-document --version 0.8.9 faraday "
[2/9] git commit: [FLINK-1117] Clean up flink-avro project: remove
deprecated AvroRecord format, migrate tests to new java api.
Posted by se...@apache.org.
[FLINK-1117] Clean up flink-avro project: remove deprecated AvroRecord format, migrate tests to new java api.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/626d6b78
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/626d6b78
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/626d6b78
Branch: refs/heads/master
Commit: 626d6b785db649e06951e2f336f5ca411b30dce5
Parents: 38e4755
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 30 14:15:22 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 30 18:39:58 2014 +0200
----------------------------------------------------------------------
flink-addons/flink-avro/pom.xml | 7 +-
.../apache/flink/api/avro/AvroBaseValue.java | 153 ----
.../api/io/avro/example/AvroTypeExample.java | 111 +++
.../apache/flink/api/io/avro/example/User.java | 269 +++++++
.../java/record/io/avro/AvroInputFormat.java | 111 ---
.../record/io/avro/AvroRecordInputFormat.java | 374 ---------
.../avro/example/ReflectiveAvroTypeExample.java | 161 ----
.../api/java/record/io/avro/example/SUser.java | 25 -
.../api/java/record/io/avro/example/User.java | 269 -------
.../flink/api/avro/AvroOutputFormatTest.java | 2 +-
.../api/avro/AvroWithEmptyArrayITCase.java | 217 ------
.../flink/api/avro/EncoderDecoderTest.java | 4 +-
.../avro/testjar/AvroExternalJarProgram.java | 13 -
.../api/io/avro/AvroRecordInputFormatTest.java | 167 ++++
.../flink/api/io/avro/generated/Colors.java | 32 +
.../flink/api/io/avro/generated/User.java | 755 +++++++++++++++++++
.../io/avro/AvroRecordInputFormatTest.java | 167 ----
.../java/record/io/avro/generated/Colors.java | 32 -
.../api/java/record/io/avro/generated/User.java | 755 -------------------
.../runtime/AvroSerializerEmptyArrayTest.java | 189 +++++
tools/maven/suppressions.xml | 2 +-
21 files changed, 1531 insertions(+), 2284 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/pom.xml b/flink-addons/flink-avro/pom.xml
index 68f722c..ff4decc 100644
--- a/flink-addons/flink-avro/pom.xml
+++ b/flink-addons/flink-avro/pom.xml
@@ -54,10 +54,11 @@ under the License.
<!-- version is derived from base module -->
</dependency>
+ <!-- guava needs to be in "provided" scope, so to not re-export the dependency -->
<dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro-mapred</artifactId>
- <!-- version is derived from base module -->
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <scope>provided</scope>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/AvroBaseValue.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/AvroBaseValue.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/AvroBaseValue.java
deleted file mode 100644
index 0d64910..0000000
--- a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/AvroBaseValue.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import java.io.IOException;
-
-import org.apache.avro.mapred.AvroValue;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Key;
-import org.apache.flink.util.ReflectionUtil;
-
-
-public abstract class AvroBaseValue<T> extends AvroValue<T> implements Key<AvroBaseValue<T>> {
-
- private static final long serialVersionUID = 1L;
-
-
- public AvroBaseValue() {}
-
- public AvroBaseValue(T datum) {
- super(datum);
- }
-
-
- // --------------------------------------------------------------------------------------------
- // Serialization / Deserialization
- // --------------------------------------------------------------------------------------------
-
- private ReflectDatumWriter<T> writer;
- private ReflectDatumReader<T> reader;
-
- private DataOutputEncoder encoder;
- private DataInputDecoder decoder;
-
-
- @Override
- public void write(DataOutputView out) throws IOException {
- // the null flag
- if (datum() == null) {
- out.writeBoolean(false);
- } else {
- out.writeBoolean(true);
-
- DataOutputEncoder encoder = getEncoder();
- encoder.setOut(out);
- getWriter().write(datum(), encoder);
- }
- }
-
- @Override
- public void read(DataInputView in) throws IOException {
- // the null flag
- if (in.readBoolean()) {
-
- DataInputDecoder decoder = getDecoder();
- decoder.setIn(in);
- datum(getReader().read(datum(), decoder));
- }
- }
-
- private ReflectDatumWriter<T> getWriter() {
- if (this.writer == null) {
- @SuppressWarnings("unchecked")
- Class<T> clazz = (Class<T>) datum().getClass();
- this.writer = new ReflectDatumWriter<T>(clazz);
- }
- return this.writer;
- }
-
- private ReflectDatumReader<T> getReader() {
- if (this.reader == null) {
- Class<T> datumClass = ReflectionUtil.getTemplateType1(getClass());
- this.reader = new ReflectDatumReader<T>(datumClass);
- }
- return this.reader;
- }
-
- private DataOutputEncoder getEncoder() {
- if (this.encoder == null) {
- this.encoder = new DataOutputEncoder();
- }
- return this.encoder;
- }
-
- private DataInputDecoder getDecoder() {
- if (this.decoder == null) {
- this.decoder = new DataInputDecoder();
- }
- return this.decoder;
- }
-
- // --------------------------------------------------------------------------------------------
- // Hashing / Equality
- // --------------------------------------------------------------------------------------------
-
- @Override
- public int hashCode() {
- return datum() == null ? 0 : datum().hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj.getClass() == this.getClass()) {
- Object otherDatum = ((AvroBaseValue<?>) obj).datum();
- Object thisDatum = datum();
-
- if (thisDatum == null) {
- return otherDatum == null;
- } else {
- return thisDatum.equals(otherDatum);
- }
- } else {
- return false;
- }
- }
-
- @Override
- public String toString() {
- return "AvroBaseValue (" + datum() + ")";
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public int compareTo(AvroBaseValue<T> o) {
- Object otherDatum = o.datum();
- Object thisDatum = datum();
-
- if (thisDatum == null) {
- return otherDatum == null ? 0 : -1;
- } else {
- return otherDatum == null ? 1: ((Comparable<Object>) thisDatum).compareTo(otherDatum);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java
new file mode 100644
index 0000000..6affeec
--- /dev/null
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.io.avro.example;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.io.GenericInputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+
+@SuppressWarnings("serial")
+public class AvroTypeExample {
+
+
+ public static void main(String[] args) throws Exception {
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<User> users = env.createInput(new UserGeneratingInputFormat());
+
+ users
+ .map(new NumberExtractingMapper())
+ .groupBy(1)
+ .reduceGroup(new ConcatenatingReducer())
+ .print();
+
+ env.execute();
+ }
+
+
+
+ public static final class NumberExtractingMapper implements MapFunction<User, Tuple2<User, Integer>> {
+
+ @Override
+ public Tuple2<User, Integer> map(User user) {
+ return new Tuple2<User, Integer>(user, user.getFavoriteNumber());
+ }
+ }
+
+
+ public static final class ConcatenatingReducer implements GroupReduceFunction<Tuple2<User, Integer>, Tuple2<Integer, String>> {
+
+ @Override
+ public void reduce(Iterable<Tuple2<User, Integer>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
+ int number = 0;
+ StringBuilder colors = new StringBuilder();
+
+ for (Tuple2<User, Integer> u : values) {
+ number = u.f1;
+ colors.append(u.f0.getFavoriteColor()).append(" - ");
+ }
+
+ colors.setLength(colors.length() - 3);
+ out.collect(new Tuple2<Integer, String>(number, colors.toString()));
+ }
+ }
+
+
+ public static final class UserGeneratingInputFormat extends GenericInputFormat<User> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final int NUM = 100;
+
+ private final Random rnd = new Random(32498562304986L);
+
+ private static final String[] NAMES = { "Peter", "Bob", "Liddy", "Alexander", "Stan" };
+
+ private static final String[] COLORS = { "mauve", "crimson", "copper", "sky", "grass" };
+
+ private int count;
+
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ return count >= NUM;
+ }
+
+ @Override
+ public User nextRecord(User reuse) throws IOException {
+ count++;
+
+ User u = new User();
+ u.setName(NAMES[rnd.nextInt(NAMES.length)]);
+ u.setFavoriteColor(COLORS[rnd.nextInt(COLORS.length)]);
+ u.setFavoriteNumber(rnd.nextInt(87));
+ return u;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/User.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/User.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/User.java
new file mode 100644
index 0000000..3394d60
--- /dev/null
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/User.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.flink.api.io.avro.example;
+@SuppressWarnings("all")
+@org.apache.avro.specific.AvroGenerated
+public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+ public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"org.apache.flink.api.avro.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}");
+ public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+ @Deprecated public java.lang.CharSequence name;
+ @Deprecated public java.lang.Integer favorite_number;
+ @Deprecated public java.lang.CharSequence favorite_color;
+
+ /**
+ * Default constructor. Note that this does not initialize fields
+ * to their default values from the schema. If that is desired then
+ * one should use {@link \#newBuilder()}.
+ */
+ public User() {}
+
+ /**
+ * All-args constructor.
+ */
+ public User(java.lang.CharSequence name, java.lang.Integer favorite_number, java.lang.CharSequence favorite_color) {
+ this.name = name;
+ this.favorite_number = favorite_number;
+ this.favorite_color = favorite_color;
+ }
+
+ public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+ // Used by DatumWriter. Applications should not call.
+ public java.lang.Object get(int field$) {
+ switch (field$) {
+ case 0: return name;
+ case 1: return favorite_number;
+ case 2: return favorite_color;
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+ // Used by DatumReader. Applications should not call.
+ @SuppressWarnings(value="unchecked")
+ public void put(int field$, java.lang.Object value$) {
+ switch (field$) {
+ case 0: name = (java.lang.CharSequence)value$; break;
+ case 1: favorite_number = (java.lang.Integer)value$; break;
+ case 2: favorite_color = (java.lang.CharSequence)value$; break;
+ default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+
+ /**
+ * Gets the value of the 'name' field.
+ */
+ public java.lang.CharSequence getName() {
+ return name;
+ }
+
+ /**
+ * Sets the value of the 'name' field.
+ * @param value the value to set.
+ */
+ public void setName(java.lang.CharSequence value) {
+ this.name = value;
+ }
+
+ /**
+ * Gets the value of the 'favorite_number' field.
+ */
+ public java.lang.Integer getFavoriteNumber() {
+ return favorite_number;
+ }
+
+ /**
+ * Sets the value of the 'favorite_number' field.
+ * @param value the value to set.
+ */
+ public void setFavoriteNumber(java.lang.Integer value) {
+ this.favorite_number = value;
+ }
+
+ /**
+ * Gets the value of the 'favorite_color' field.
+ */
+ public java.lang.CharSequence getFavoriteColor() {
+ return favorite_color;
+ }
+
+ /**
+ * Sets the value of the 'favorite_color' field.
+ * @param value the value to set.
+ */
+ public void setFavoriteColor(java.lang.CharSequence value) {
+ this.favorite_color = value;
+ }
+
+ /** Creates a new User RecordBuilder */
+ public static org.apache.flink.api.io.avro.example.User.Builder newBuilder() {
+ return new org.apache.flink.api.io.avro.example.User.Builder();
+ }
+
+ /** Creates a new User RecordBuilder by copying an existing Builder */
+ public static org.apache.flink.api.io.avro.example.User.Builder newBuilder(org.apache.flink.api.io.avro.example.User.Builder other) {
+ return new org.apache.flink.api.io.avro.example.User.Builder(other);
+ }
+
+ /** Creates a new User RecordBuilder by copying an existing User instance */
+ public static org.apache.flink.api.io.avro.example.User.Builder newBuilder(org.apache.flink.api.io.avro.example.User other) {
+ return new org.apache.flink.api.io.avro.example.User.Builder(other);
+ }
+
+ /**
+ * RecordBuilder for User instances.
+ */
+ public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<User>
+ implements org.apache.avro.data.RecordBuilder<User> {
+
+ private java.lang.CharSequence name;
+ private java.lang.Integer favorite_number;
+ private java.lang.CharSequence favorite_color;
+
+ /** Creates a new Builder */
+ private Builder() {
+ super(org.apache.flink.api.io.avro.example.User.SCHEMA$);
+ }
+
+ /** Creates a Builder by copying an existing Builder */
+ private Builder(org.apache.flink.api.io.avro.example.User.Builder other) {
+ super(other);
+ if (isValidValue(fields()[0], other.name)) {
+ this.name = data().deepCopy(fields()[0].schema(), other.name);
+ fieldSetFlags()[0] = true;
+ }
+ if (isValidValue(fields()[1], other.favorite_number)) {
+ this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
+ fieldSetFlags()[1] = true;
+ }
+ if (isValidValue(fields()[2], other.favorite_color)) {
+ this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
+ fieldSetFlags()[2] = true;
+ }
+ }
+
+ /** Creates a Builder by copying an existing User instance */
+ private Builder(org.apache.flink.api.io.avro.example.User other) {
+ super(org.apache.flink.api.io.avro.example.User.SCHEMA$);
+ if (isValidValue(fields()[0], other.name)) {
+ this.name = data().deepCopy(fields()[0].schema(), other.name);
+ fieldSetFlags()[0] = true;
+ }
+ if (isValidValue(fields()[1], other.favorite_number)) {
+ this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
+ fieldSetFlags()[1] = true;
+ }
+ if (isValidValue(fields()[2], other.favorite_color)) {
+ this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
+ fieldSetFlags()[2] = true;
+ }
+ }
+
+ /** Gets the value of the 'name' field */
+ public java.lang.CharSequence getName() {
+ return name;
+ }
+
+ /** Sets the value of the 'name' field */
+ public org.apache.flink.api.io.avro.example.User.Builder setName(java.lang.CharSequence value) {
+ validate(fields()[0], value);
+ this.name = value;
+ fieldSetFlags()[0] = true;
+ return this;
+ }
+
+ /** Checks whether the 'name' field has been set */
+ public boolean hasName() {
+ return fieldSetFlags()[0];
+ }
+
+ /** Clears the value of the 'name' field */
+ public org.apache.flink.api.io.avro.example.User.Builder clearName() {
+ name = null;
+ fieldSetFlags()[0] = false;
+ return this;
+ }
+
+ /** Gets the value of the 'favorite_number' field */
+ public java.lang.Integer getFavoriteNumber() {
+ return favorite_number;
+ }
+
+ /** Sets the value of the 'favorite_number' field */
+ public org.apache.flink.api.io.avro.example.User.Builder setFavoriteNumber(java.lang.Integer value) {
+ validate(fields()[1], value);
+ this.favorite_number = value;
+ fieldSetFlags()[1] = true;
+ return this;
+ }
+
+ /** Checks whether the 'favorite_number' field has been set */
+ public boolean hasFavoriteNumber() {
+ return fieldSetFlags()[1];
+ }
+
+ /** Clears the value of the 'favorite_number' field */
+ public org.apache.flink.api.io.avro.example.User.Builder clearFavoriteNumber() {
+ favorite_number = null;
+ fieldSetFlags()[1] = false;
+ return this;
+ }
+
+ /** Gets the value of the 'favorite_color' field */
+ public java.lang.CharSequence getFavoriteColor() {
+ return favorite_color;
+ }
+
+ /** Sets the value of the 'favorite_color' field */
+ public org.apache.flink.api.io.avro.example.User.Builder setFavoriteColor(java.lang.CharSequence value) {
+ validate(fields()[2], value);
+ this.favorite_color = value;
+ fieldSetFlags()[2] = true;
+ return this;
+ }
+
+ /** Checks whether the 'favorite_color' field has been set */
+ public boolean hasFavoriteColor() {
+ return fieldSetFlags()[2];
+ }
+
+ /** Clears the value of the 'favorite_color' field */
+ public org.apache.flink.api.io.avro.example.User.Builder clearFavoriteColor() {
+ favorite_color = null;
+ fieldSetFlags()[2] = false;
+ return this;
+ }
+
+ @Override
+ public User build() {
+ try {
+ User record = new User();
+ record.name = fieldSetFlags()[0] ? this.name : (java.lang.CharSequence) defaultValue(fields()[0]);
+ record.favorite_number = fieldSetFlags()[1] ? this.favorite_number : (java.lang.Integer) defaultValue(fields()[1]);
+ record.favorite_color = fieldSetFlags()[2] ? this.favorite_color : (java.lang.CharSequence) defaultValue(fields()[2]);
+ return record;
+ } catch (Exception e) {
+ throw new org.apache.avro.AvroRuntimeException(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java
deleted file mode 100644
index a898f8d..0000000
--- a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io.avro;
-
-import java.io.IOException;
-
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.FileReader;
-import org.apache.avro.file.SeekableInput;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.avro.AvroBaseValue;
-import org.apache.flink.api.avro.FSDataInputStreamWrapper;
-import org.apache.flink.api.java.record.io.FileInputFormat;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.ReflectionUtil;
-
-
-public class AvroInputFormat<E> extends FileInputFormat {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(AvroInputFormat.class);
-
-
- private final Class<? extends AvroBaseValue<E>> avroWrapperTypeClass;
-
- private final Class<E> avroValueType;
-
-
- private transient FileReader<E> dataFileReader;
-
- private transient E reuseAvroValue;
-
- private transient AvroBaseValue<E> wrapper;
-
-
- public AvroInputFormat(Class<? extends AvroBaseValue<E>> wrapperClass) {
- this.avroWrapperTypeClass = wrapperClass;
- this.avroValueType = ReflectionUtil.getTemplateType1(wrapperClass);
- this.unsplittable = true;
- }
-
- public AvroInputFormat(Class<? extends AvroBaseValue<E>> wrapperClass, Class<E> avroType) {
- this.avroValueType = avroType;
- this.avroWrapperTypeClass = wrapperClass;
- this.unsplittable = true;
- }
-
- @Override
- public void open(FileInputSplit split) throws IOException {
- super.open(split);
-
- this.wrapper = InstantiationUtil.instantiate(avroWrapperTypeClass, AvroBaseValue.class);
-
- DatumReader<E> datumReader;
- if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) {
- datumReader = new SpecificDatumReader<E>(avroValueType);
- } else {
- datumReader = new ReflectDatumReader<E>(avroValueType);
- }
-
- LOG.info("Opening split " + split);
-
- SeekableInput in = new FSDataInputStreamWrapper(stream, (int) split.getLength());
-
- dataFileReader = DataFileReader.openReader(in, datumReader);
- dataFileReader.sync(split.getStart());
-
- reuseAvroValue = null;
- }
-
- @Override
- public boolean reachedEnd() throws IOException {
- return !dataFileReader.hasNext();
- }
-
- @Override
- public Record nextRecord(Record record) throws IOException {
- if (!dataFileReader.hasNext()) {
- return null;
- }
-
- reuseAvroValue = dataFileReader.next(reuseAvroValue);
- wrapper.datum(reuseAvroValue);
- record.setField(0, wrapper);
- return record;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java
deleted file mode 100644
index 1639ec4..0000000
--- a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java
+++ /dev/null
@@ -1,374 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io.avro;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.FileReader;
-import org.apache.avro.file.SeekableInput;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.avro.FSDataInputStreamWrapper;
-import org.apache.flink.api.java.record.io.FileInputFormat;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.BooleanValue;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.ListValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.MapValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.types.Value;
-
-/**
- * Input format to read Avro files.
- *
- * The input format currently supports only flat avro schemas. So there is no
- * support for complex types except for nullable primitve fields, e.g.
- * ["string", null] (See
- * http://avro.apache.org/docs/current/spec.html#schema_complex)
- *
- */
-public class AvroRecordInputFormat extends FileInputFormat {
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG = LoggerFactory.getLogger(AvroRecordInputFormat.class);
-
- private FileReader<GenericRecord> dataFileReader;
- private GenericRecord reuseAvroRecord = null;
-
- @Override
- public void open(FileInputSplit split) throws IOException {
- super.open(split);
- DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
- SeekableInput in = new FSDataInputStreamWrapper(stream, (int) split.getLength());
- LOG.info("Opening split " + split);
- dataFileReader = DataFileReader.openReader(in, datumReader);
- dataFileReader.sync(split.getStart());
- }
-
- @Override
- public boolean reachedEnd() throws IOException {
- return !dataFileReader.hasNext();
- }
-
- @Override
- public Record nextRecord(Record record) throws IOException {
- if (!dataFileReader.hasNext()) {
- return null;
- }
- if (record == null) {
- throw new IllegalArgumentException("Empty PactRecord given");
- }
- reuseAvroRecord = dataFileReader.next(reuseAvroRecord);
- final List<Field> fields = reuseAvroRecord.getSchema().getFields();
- for (Field field : fields) {
- final Value value = convertAvroToPactValue(field, reuseAvroRecord.get(field.pos()));
- record.setField(field.pos(), value);
- record.updateBinaryRepresenation();
- }
-
- return record;
- }
-
-
- @SuppressWarnings("unchecked")
- private final Value convertAvroToPactValue(final Field field, final Object avroRecord) {
- if (avroRecord == null) {
- return null;
- }
- final Type type = checkTypeConstraintsAndGetType(field.schema());
-
- // check for complex types
- // (complex type FIXED is not yet supported)
- switch (type) {
- case ARRAY:
- final Type elementType = field.schema().getElementType().getType();
- final List<?> avroList = (List<?>) avroRecord;
- return convertAvroArrayToListValue(elementType, avroList);
- case ENUM:
- final List<String> symbols = field.schema().getEnumSymbols();
- final String avroRecordString = avroRecord.toString();
- if (!symbols.contains(avroRecordString)) {
- throw new RuntimeException("The given Avro file contains field with a invalid enum symbol");
- }
- sString.setValue(avroRecordString);
- return sString;
- case MAP:
- final Type valueType = field.schema().getValueType().getType();
- final Map<CharSequence, ?> avroMap = (Map<CharSequence, ?>) avroRecord;
- return convertAvroMapToMapValue(valueType, avroMap);
-
- // primitive type
- default:
- return convertAvroPrimitiveToValue(type, avroRecord);
-
- }
- }
-
- private final ListValue<?> convertAvroArrayToListValue(Type elementType, List<?> avroList) {
- switch (elementType) {
- case STRING:
- StringListValue sl = new StringListValue();
- for (Object item : avroList) {
- sl.add(new StringValue((CharSequence) item));
- }
- return sl;
- case INT:
- IntListValue il = new IntListValue();
- for (Object item : avroList) {
- il.add(new IntValue((Integer) item));
- }
- return il;
- case BOOLEAN:
- BooleanListValue bl = new BooleanListValue();
- for (Object item : avroList) {
- bl.add(new BooleanValue((Boolean) item));
- }
- return bl;
- case DOUBLE:
- DoubleListValue dl = new DoubleListValue();
- for (Object item : avroList) {
- dl.add(new DoubleValue((Double) item));
- }
- return dl;
- case FLOAT:
- FloatListValue fl = new FloatListValue();
- for (Object item : avroList) {
- fl.add(new FloatValue((Float) item));
- }
- return fl;
- case LONG:
- LongListValue ll = new LongListValue();
- for (Object item : avroList) {
- ll.add(new LongValue((Long) item));
- }
- return ll;
- default:
- throw new RuntimeException("Elements of type " + elementType + " are not supported for Avro arrays.");
- }
- }
-
- private final MapValue<StringValue, ?> convertAvroMapToMapValue(Type mapValueType, Map<CharSequence, ?> avroMap) {
- switch (mapValueType) {
- case STRING:
- StringMapValue sm = new StringMapValue();
- for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
- sm.put(new StringValue((CharSequence) entry.getKey()), new StringValue((String) entry.getValue()));
- }
- return sm;
- case INT:
- IntMapValue im = new IntMapValue();
- for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
- im.put(new StringValue((CharSequence) entry.getKey()), new IntValue((Integer) entry.getValue()));
- }
- return im;
- case BOOLEAN:
- BooleanMapValue bm = new BooleanMapValue();
- for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
- bm.put(new StringValue((CharSequence) entry.getKey()), new BooleanValue((Boolean) entry.getValue()));
- }
- return bm;
- case DOUBLE:
- DoubleMapValue dm = new DoubleMapValue();
- for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
- dm.put(new StringValue((CharSequence) entry.getKey()), new DoubleValue((Double) entry.getValue()));
- }
- return dm;
- case FLOAT:
- FloatMapValue fm = new FloatMapValue();
- for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
- fm.put(new StringValue((CharSequence) entry.getKey()), new FloatValue((Float) entry.getValue()));
- }
- return fm;
- case LONG:
- LongMapValue lm = new LongMapValue();
- for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
- lm.put(new StringValue((CharSequence) entry.getKey()), new LongValue((Long) entry.getValue()));
- }
- return lm;
-
- default:
- throw new RuntimeException("Map values of type " + mapValueType + " are not supported for Avro map.");
- }
- }
-
- private StringValue sString = new StringValue();
- private IntValue sInt = new IntValue();
- private BooleanValue sBool = new BooleanValue();
- private DoubleValue sDouble = new DoubleValue();
- private FloatValue sFloat = new FloatValue();
- private LongValue sLong = new LongValue();
-
- private final Value convertAvroPrimitiveToValue(Type type, Object avroRecord) {
- switch (type) {
- case STRING:
- sString.setValue((CharSequence) avroRecord);
- return sString;
- case INT:
- sInt.setValue((Integer) avroRecord);
- return sInt;
- case BOOLEAN:
- sBool.setValue((Boolean) avroRecord);
- return sBool;
- case DOUBLE:
- sDouble.setValue((Double) avroRecord);
- return sDouble;
- case FLOAT:
- sFloat.setValue((Float) avroRecord);
- return sFloat;
- case LONG:
- sLong.setValue((Long) avroRecord);
- return sLong;
- case NULL:
- return NullValue.getInstance();
- default:
- throw new RuntimeException(
- "Type "
- + type
- + " for AvroInputFormat is not implemented. Open an issue on GitHub.");
- }
- }
-
- private final Type checkTypeConstraintsAndGetType(final Schema schema) {
- final Type type = schema.getType();
- if (type == Type.RECORD) {
- throw new RuntimeException("The given Avro file contains complex data types which are not supported right now");
- }
-
- if (type == Type.UNION) {
- List<Schema> types = schema.getTypes();
- if (types.size() > 2) {
- throw new RuntimeException("The given Avro file contains a union that has more than two elements");
- }
- if (types.size() == 1 && types.get(0).getType() != Type.UNION) {
- return types.get(0).getType();
- }
- if (types.get(0).getType() == Type.UNION || types.get(1).getType() == Type.UNION) {
- throw new RuntimeException("The given Avro file contains a nested union");
- }
- if (types.get(0).getType() == Type.NULL) {
- return types.get(1).getType();
- } else {
- if (types.get(1).getType() != Type.NULL) {
- throw new RuntimeException("The given Avro file is contains a union with two non-null types.");
- }
- return types.get(0).getType();
- }
- }
- return type;
- }
-
- /**
- * Set minNumSplits to number of files.
- */
- @Override
- public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException {
- int numAvroFiles = 0;
- final Path path = this.filePath;
- // get all the files that are involved in the splits
- final FileSystem fs = path.getFileSystem();
- final FileStatus pathFile = fs.getFileStatus(path);
-
- if (!acceptFile(pathFile)) {
- throw new IOException("The given file does not pass the file-filter");
- }
- if (pathFile.isDir()) {
- // input is directory. list all contained files
- final FileStatus[] dir = fs.listStatus(path);
- for (int i = 0; i < dir.length; i++) {
- if (!dir[i].isDir() && acceptFile(dir[i])) {
- numAvroFiles++;
- }
- }
- } else {
- numAvroFiles = 1;
- }
- return super.createInputSplits(numAvroFiles);
- }
-
- // --------------------------------------------------------------------------------------------
- // Concrete subclasses of ListValue and MapValue for all possible primitive types
- // --------------------------------------------------------------------------------------------
-
- public static class StringListValue extends ListValue<StringValue> {
- private static final long serialVersionUID = 1L;
- }
-
- public static class IntListValue extends ListValue<IntValue> {
- private static final long serialVersionUID = 1L;
- }
-
- public static class BooleanListValue extends ListValue<BooleanValue> {
- private static final long serialVersionUID = 1L;
- }
-
- public static class DoubleListValue extends ListValue<DoubleValue> {
- private static final long serialVersionUID = 1L;
- }
-
- public static class FloatListValue extends ListValue<FloatValue> {
- private static final long serialVersionUID = 1L;
- }
-
- public static class LongListValue extends ListValue<LongValue> {
- private static final long serialVersionUID = 1L;
- }
-
- public static class StringMapValue extends MapValue<StringValue, StringValue> {
- private static final long serialVersionUID = 1L;
- }
-
- public static class IntMapValue extends MapValue<StringValue, IntValue> {
- private static final long serialVersionUID = 1L;
- }
-
- public static class BooleanMapValue extends MapValue<StringValue, BooleanValue> {
- private static final long serialVersionUID = 1L;
- }
-
- public static class DoubleMapValue extends MapValue<StringValue, DoubleValue> {
- private static final long serialVersionUID = 1L;
- }
-
- public static class FloatMapValue extends MapValue<StringValue, FloatValue> {
- private static final long serialVersionUID = 1L;
- }
-
- public static class LongMapValue extends MapValue<StringValue, LongValue> {
- private static final long serialVersionUID = 1L;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/ReflectiveAvroTypeExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/ReflectiveAvroTypeExample.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/ReflectiveAvroTypeExample.java
deleted file mode 100644
index d6d1213..0000000
--- a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/ReflectiveAvroTypeExample.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io.avro.example;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Iterator;
-import java.util.Random;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.io.GenericInputFormat;
-import org.apache.flink.api.java.record.operators.GenericDataSink;
-import org.apache.flink.api.java.record.operators.GenericDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.client.LocalExecutor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-
-
-public class ReflectiveAvroTypeExample {
-
-
- public static void main(String[] args) throws Exception {
-
- GenericDataSource<UserGeneratingInputFormat> source = new GenericDataSource<UserGeneratingInputFormat>(UserGeneratingInputFormat.class);
-
- MapOperator mapper = MapOperator.builder(new NumberExtractingMapper())
- .input(source).name("le mapper").build();
-
- ReduceOperator reducer = ReduceOperator.builder(new ConcatenatingReducer(), IntValue.class, 1)
- .input(mapper).name("le reducer").build();
-
- GenericDataSink sink = new GenericDataSink(PrintingOutputFormat.class, reducer);
-
- Plan p = new Plan(sink);
- p.setDefaultParallelism(4);
-
- LocalExecutor.execute(p);
- }
-
-
- public static final class NumberExtractingMapper extends MapFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void map(Record record, Collector<Record> out) throws Exception {
- User u = record.getField(0, SUser.class).datum();
- record.setField(1, new IntValue(u.getFavoriteNumber()));
- out.collect(record);
- }
- }
-
-
- public static final class ConcatenatingReducer extends ReduceFunction implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private final Record result = new Record(2);
-
- @Override
- public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
- Record r = records.next();
-
- int num = r.getField(1, IntValue.class).getValue();
- String names = r.getField(0, SUser.class).datum().getFavoriteColor().toString();
-
- while (records.hasNext()) {
- r = records.next();
- names += " - " + r.getField(0, SUser.class).datum().getFavoriteColor().toString();
- }
-
- result.setField(0, new IntValue(num));
- result.setField(1, new StringValue(names));
- out.collect(result);
- }
-
- }
-
-
- public static final class UserGeneratingInputFormat extends GenericInputFormat {
-
- private static final long serialVersionUID = 1L;
-
- private static final int NUM = 100;
-
- private final Random rnd = new Random(32498562304986L);
-
- private static final String[] NAMES = { "Peter", "Bob", "Liddy", "Alexander", "Stan" };
-
- private static final String[] COLORS = { "mauve", "crimson", "copper", "sky", "grass" };
-
- private int count;
-
-
- @Override
- public boolean reachedEnd() throws IOException {
- return count >= NUM;
- }
-
- @Override
- public Record nextRecord(Record record) throws IOException {
- count++;
-
- User u = new User();
- u.setName(NAMES[rnd.nextInt(NAMES.length)]);
- u.setFavoriteColor(COLORS[rnd.nextInt(COLORS.length)]);
- u.setFavoriteNumber(rnd.nextInt(87));
-
- SUser su = new SUser();
- su.datum(u);
-
- record.setField(0, su);
- return record;
- }
- }
-
- public static final class PrintingOutputFormat implements OutputFormat<Record> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void configure(Configuration parameters) {}
-
- @Override
- public void open(int taskNumber, int numTasks) throws IOException {}
-
- @Override
- public void writeRecord(Record record) throws IOException {
- int color = record.getField(0, IntValue.class).getValue();
- String names = record.getField(1, StringValue.class).getValue();
-
- System.out.println(color + ": " + names);
- }
-
- @Override
- public void close() throws IOException {}
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/SUser.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/SUser.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/SUser.java
deleted file mode 100644
index 542639e..0000000
--- a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/SUser.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io.avro.example;
-
-import org.apache.flink.api.avro.AvroBaseValue;
-
-public class SUser extends AvroBaseValue<User> {
- private static final long serialVersionUID = 1L;
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/User.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/User.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/User.java
deleted file mode 100644
index d4fb292..0000000
--- a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/User.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/**
- * Autogenerated by Avro
- *
- * DO NOT EDIT DIRECTLY
- */
-package org.apache.flink.api.java.record.io.avro.example;
-@SuppressWarnings("all")
-@org.apache.avro.specific.AvroGenerated
-public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
- public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"org.apache.flink.api.avro.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}");
- public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
- @Deprecated public java.lang.CharSequence name;
- @Deprecated public java.lang.Integer favorite_number;
- @Deprecated public java.lang.CharSequence favorite_color;
-
- /**
- * Default constructor. Note that this does not initialize fields
- * to their default values from the schema. If that is desired then
- * one should use {@link \#newBuilder()}.
- */
- public User() {}
-
- /**
- * All-args constructor.
- */
- public User(java.lang.CharSequence name, java.lang.Integer favorite_number, java.lang.CharSequence favorite_color) {
- this.name = name;
- this.favorite_number = favorite_number;
- this.favorite_color = favorite_color;
- }
-
- public org.apache.avro.Schema getSchema() { return SCHEMA$; }
- // Used by DatumWriter. Applications should not call.
- public java.lang.Object get(int field$) {
- switch (field$) {
- case 0: return name;
- case 1: return favorite_number;
- case 2: return favorite_color;
- default: throw new org.apache.avro.AvroRuntimeException("Bad index");
- }
- }
- // Used by DatumReader. Applications should not call.
- @SuppressWarnings(value="unchecked")
- public void put(int field$, java.lang.Object value$) {
- switch (field$) {
- case 0: name = (java.lang.CharSequence)value$; break;
- case 1: favorite_number = (java.lang.Integer)value$; break;
- case 2: favorite_color = (java.lang.CharSequence)value$; break;
- default: throw new org.apache.avro.AvroRuntimeException("Bad index");
- }
- }
-
- /**
- * Gets the value of the 'name' field.
- */
- public java.lang.CharSequence getName() {
- return name;
- }
-
- /**
- * Sets the value of the 'name' field.
- * @param value the value to set.
- */
- public void setName(java.lang.CharSequence value) {
- this.name = value;
- }
-
- /**
- * Gets the value of the 'favorite_number' field.
- */
- public java.lang.Integer getFavoriteNumber() {
- return favorite_number;
- }
-
- /**
- * Sets the value of the 'favorite_number' field.
- * @param value the value to set.
- */
- public void setFavoriteNumber(java.lang.Integer value) {
- this.favorite_number = value;
- }
-
- /**
- * Gets the value of the 'favorite_color' field.
- */
- public java.lang.CharSequence getFavoriteColor() {
- return favorite_color;
- }
-
- /**
- * Sets the value of the 'favorite_color' field.
- * @param value the value to set.
- */
- public void setFavoriteColor(java.lang.CharSequence value) {
- this.favorite_color = value;
- }
-
- /** Creates a new User RecordBuilder */
- public static org.apache.flink.api.java.record.io.avro.example.User.Builder newBuilder() {
- return new org.apache.flink.api.java.record.io.avro.example.User.Builder();
- }
-
- /** Creates a new User RecordBuilder by copying an existing Builder */
- public static org.apache.flink.api.java.record.io.avro.example.User.Builder newBuilder(org.apache.flink.api.java.record.io.avro.example.User.Builder other) {
- return new org.apache.flink.api.java.record.io.avro.example.User.Builder(other);
- }
-
- /** Creates a new User RecordBuilder by copying an existing User instance */
- public static org.apache.flink.api.java.record.io.avro.example.User.Builder newBuilder(org.apache.flink.api.java.record.io.avro.example.User other) {
- return new org.apache.flink.api.java.record.io.avro.example.User.Builder(other);
- }
-
- /**
- * RecordBuilder for User instances.
- */
- public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<User>
- implements org.apache.avro.data.RecordBuilder<User> {
-
- private java.lang.CharSequence name;
- private java.lang.Integer favorite_number;
- private java.lang.CharSequence favorite_color;
-
- /** Creates a new Builder */
- private Builder() {
- super(org.apache.flink.api.java.record.io.avro.example.User.SCHEMA$);
- }
-
- /** Creates a Builder by copying an existing Builder */
- private Builder(org.apache.flink.api.java.record.io.avro.example.User.Builder other) {
- super(other);
- if (isValidValue(fields()[0], other.name)) {
- this.name = data().deepCopy(fields()[0].schema(), other.name);
- fieldSetFlags()[0] = true;
- }
- if (isValidValue(fields()[1], other.favorite_number)) {
- this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
- fieldSetFlags()[1] = true;
- }
- if (isValidValue(fields()[2], other.favorite_color)) {
- this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
- fieldSetFlags()[2] = true;
- }
- }
-
- /** Creates a Builder by copying an existing User instance */
- private Builder(org.apache.flink.api.java.record.io.avro.example.User other) {
- super(org.apache.flink.api.java.record.io.avro.example.User.SCHEMA$);
- if (isValidValue(fields()[0], other.name)) {
- this.name = data().deepCopy(fields()[0].schema(), other.name);
- fieldSetFlags()[0] = true;
- }
- if (isValidValue(fields()[1], other.favorite_number)) {
- this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
- fieldSetFlags()[1] = true;
- }
- if (isValidValue(fields()[2], other.favorite_color)) {
- this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
- fieldSetFlags()[2] = true;
- }
- }
-
- /** Gets the value of the 'name' field */
- public java.lang.CharSequence getName() {
- return name;
- }
-
- /** Sets the value of the 'name' field */
- public org.apache.flink.api.java.record.io.avro.example.User.Builder setName(java.lang.CharSequence value) {
- validate(fields()[0], value);
- this.name = value;
- fieldSetFlags()[0] = true;
- return this;
- }
-
- /** Checks whether the 'name' field has been set */
- public boolean hasName() {
- return fieldSetFlags()[0];
- }
-
- /** Clears the value of the 'name' field */
- public org.apache.flink.api.java.record.io.avro.example.User.Builder clearName() {
- name = null;
- fieldSetFlags()[0] = false;
- return this;
- }
-
- /** Gets the value of the 'favorite_number' field */
- public java.lang.Integer getFavoriteNumber() {
- return favorite_number;
- }
-
- /** Sets the value of the 'favorite_number' field */
- public org.apache.flink.api.java.record.io.avro.example.User.Builder setFavoriteNumber(java.lang.Integer value) {
- validate(fields()[1], value);
- this.favorite_number = value;
- fieldSetFlags()[1] = true;
- return this;
- }
-
- /** Checks whether the 'favorite_number' field has been set */
- public boolean hasFavoriteNumber() {
- return fieldSetFlags()[1];
- }
-
- /** Clears the value of the 'favorite_number' field */
- public org.apache.flink.api.java.record.io.avro.example.User.Builder clearFavoriteNumber() {
- favorite_number = null;
- fieldSetFlags()[1] = false;
- return this;
- }
-
- /** Gets the value of the 'favorite_color' field */
- public java.lang.CharSequence getFavoriteColor() {
- return favorite_color;
- }
-
- /** Sets the value of the 'favorite_color' field */
- public org.apache.flink.api.java.record.io.avro.example.User.Builder setFavoriteColor(java.lang.CharSequence value) {
- validate(fields()[2], value);
- this.favorite_color = value;
- fieldSetFlags()[2] = true;
- return this;
- }
-
- /** Checks whether the 'favorite_color' field has been set */
- public boolean hasFavoriteColor() {
- return fieldSetFlags()[2];
- }
-
- /** Clears the value of the 'favorite_color' field */
- public org.apache.flink.api.java.record.io.avro.example.User.Builder clearFavoriteColor() {
- favorite_color = null;
- fieldSetFlags()[2] = false;
- return this;
- }
-
- @Override
- public User build() {
- try {
- User record = new User();
- record.name = fieldSetFlags()[0] ? this.name : (java.lang.CharSequence) defaultValue(fields()[0]);
- record.favorite_number = fieldSetFlags()[1] ? this.favorite_number : (java.lang.Integer) defaultValue(fields()[1]);
- record.favorite_color = fieldSetFlags()[2] ? this.favorite_color : (java.lang.CharSequence) defaultValue(fields()[2]);
- return record;
- } catch (Exception e) {
- throw new org.apache.avro.AvroRuntimeException(e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
index a60787d..db235c0 100644
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
@@ -28,11 +28,11 @@ import org.apache.avro.file.DataFileReader;
import org.apache.avro.io.DatumReader;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.flink.api.io.avro.example.User;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.io.AvroOutputFormat;
-import org.apache.flink.api.java.record.io.avro.example.User;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.test.util.JavaProgramTestBase;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java
deleted file mode 100644
index a3850e2..0000000
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.avro.reflect.Nullable;
-import org.apache.flink.api.avro.AvroBaseValue;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.record.functions.CoGroupFunction;
-import org.apache.flink.api.java.record.io.GenericInputFormat;
-import org.apache.flink.api.java.record.operators.CoGroupOperator;
-import org.apache.flink.api.java.record.operators.GenericDataSink;
-import org.apache.flink.api.java.record.operators.GenericDataSource;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-
-public class AvroWithEmptyArrayITCase extends RecordAPITestBase {
-
- @Override
- protected Plan getTestJob() {
- GenericDataSource<RandomInputFormat> bookSource = new GenericDataSource<RandomInputFormat>(
- new RandomInputFormat(true));
- GenericDataSource<RandomInputFormat> authorSource = new GenericDataSource<RandomInputFormat>(
- new RandomInputFormat(false));
-
- CoGroupOperator coGroupOperator = CoGroupOperator.builder(MyCoGrouper.class, LongValue.class, 0, 0)
- .input1(bookSource).input2(authorSource).name("CoGrouper Test").build();
-
- GenericDataSink sink = new GenericDataSink(PrintingOutputFormat.class, coGroupOperator);
-
- Plan plan = new Plan(sink, "CoGroper Test Plan");
- plan.setDefaultParallelism(1);
- return plan;
- }
-
- public static class SBookAvroValue extends AvroBaseValue<Book> {
- private static final long serialVersionUID = 1L;
-
- public SBookAvroValue() {}
-
- public SBookAvroValue(Book datum) {
- super(datum);
- }
- }
-
- public static class Book {
-
- long bookId;
- @Nullable
- String title;
- long authorId;
-
- public Book() {
- }
-
- public Book(long bookId, String title, long authorId) {
- this.bookId = bookId;
- this.title = title;
- this.authorId = authorId;
- }
- }
-
- public static class SBookAuthorValue extends AvroBaseValue<BookAuthor> {
- private static final long serialVersionUID = 1L;
-
- public SBookAuthorValue() {}
-
- public SBookAuthorValue(BookAuthor datum) {
- super(datum);
- }
- }
-
- public static class BookAuthor {
-
- enum BookType {
- book,
- article,
- journal
- }
-
- long authorId;
-
- @Nullable
- List<String> bookTitles;
-
- @Nullable
- List<Book> books;
-
- String authorName;
-
- BookType bookType;
-
- public BookAuthor() {}
-
- public BookAuthor(long authorId, List<String> bookTitles, String authorName) {
- this.authorId = authorId;
- this.bookTitles = bookTitles;
- this.authorName = authorName;
- }
- }
-
- public static class RandomInputFormat extends GenericInputFormat {
- private static final long serialVersionUID = 1L;
-
- private final boolean isBook;
-
- private boolean touched = false;
-
- public RandomInputFormat(boolean isBook) {
- this.isBook = isBook;
- }
-
- @Override
- public boolean reachedEnd() throws IOException {
- return touched;
- }
-
- @Override
- public Record nextRecord(Record record) throws IOException {
- touched = true;
- record.setField(0, new LongValue(26382648));
-
- if (isBook) {
- Book b = new Book(123, "This is a test book", 26382648);
- record.setField(1, new SBookAvroValue(b));
- } else {
- List<String> titles = new ArrayList<String>();
- // titles.add("Title1");
- // titles.add("Title2");
- // titles.add("Title3");
-
- List<Book> books = new ArrayList<Book>();
- books.add(new Book(123, "This is a test book", 1));
- books.add(new Book(24234234, "This is a test book", 1));
- books.add(new Book(1234324, "This is a test book", 3));
-
- BookAuthor a = new BookAuthor(1, titles, "Test Author");
- a.books = books;
- a.bookType = BookAuthor.BookType.journal;
- record.setField(1, new SBookAuthorValue(a));
- }
-
- return record;
- }
- }
-
- public static final class PrintingOutputFormat implements OutputFormat<Record> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void configure(Configuration parameters) {}
-
- @Override
- public void open(int taskNumber, int numTasks) {}
-
- @Override
- public void writeRecord(Record record) throws IOException {
- long key = record.getField(0, LongValue.class).getValue();
- String val = record.getField(1, StringValue.class).getValue();
- System.out.println(key + " : " + val);
- }
-
- @Override
- public void close() {}
- }
-
- public static class MyCoGrouper extends CoGroupFunction {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out) {
-
- Record r1 = null;
- while (records1.hasNext()) {
- r1 = records1.next();
- }
- Record r2 = null;
- while (records2.hasNext()) {
- r2 = records2.next();
- }
-
- if (r1 != null) {
- r1.getField(1, SBookAvroValue.class).datum();
- }
-
- if (r2 != null) {
- r2.getField(1, SBookAuthorValue.class).datum();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
index 78ff2f1..74b5397 100644
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
@@ -31,8 +31,8 @@ import java.util.Random;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.flink.api.java.record.io.avro.generated.Colors;
-import org.apache.flink.api.java.record.io.avro.generated.User;
+import org.apache.flink.api.io.avro.generated.Colors;
+import org.apache.flink.api.io.avro.generated.User;
import org.apache.flink.util.StringUtils;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
index 31e083f..25e2e0c 100644
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
@@ -39,7 +39,6 @@ import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.flink.api.avro.AvroBaseValue;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -125,18 +124,6 @@ public class AvroExternalJarProgram {
}
}
-
- public static final class SUser extends AvroBaseValue<MyUser> {
-
- static final long serialVersionUID = 1L;
-
- public SUser() {}
-
- public SUser(MyUser u) {
- super(u);
- }
- }
-
// --------------------------------------------------------------------------------------------
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
new file mode 100644
index 0000000..d8d8b46
--- /dev/null
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.io.avro;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.api.io.avro.generated.Colors;
+import org.apache.flink.api.io.avro.generated.User;
+import org.apache.flink.api.java.io.AvroInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test the avro input format.
+ * (The testcase is mostly the getting started tutorial of avro)
+ * http://avro.apache.org/docs/current/gettingstartedjava.html
+ */
+public class AvroRecordInputFormatTest {
+
+ private File testFile;
+
+ final static String TEST_NAME = "Alyssa";
+
+ final static String TEST_ARRAY_STRING_1 = "ELEMENT 1";
+ final static String TEST_ARRAY_STRING_2 = "ELEMENT 2";
+
+ final static boolean TEST_ARRAY_BOOLEAN_1 = true;
+ final static boolean TEST_ARRAY_BOOLEAN_2 = false;
+
+ final static Colors TEST_ENUM_COLOR = Colors.GREEN;
+
+ final static String TEST_MAP_KEY1 = "KEY 1";
+ final static long TEST_MAP_VALUE1 = 8546456L;
+ final static String TEST_MAP_KEY2 = "KEY 2";
+ final static long TEST_MAP_VALUE2 = 17554L;
+
+ @Before
+ public void createFiles() throws IOException {
+ testFile = File.createTempFile("AvroInputFormatTest", null);
+
+ ArrayList<CharSequence> stringArray = new ArrayList<CharSequence>();
+ stringArray.add(TEST_ARRAY_STRING_1);
+ stringArray.add(TEST_ARRAY_STRING_2);
+
+ ArrayList<Boolean> booleanArray = new ArrayList<Boolean>();
+ booleanArray.add(TEST_ARRAY_BOOLEAN_1);
+ booleanArray.add(TEST_ARRAY_BOOLEAN_2);
+
+ HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, Long>();
+ longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1);
+ longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2);
+
+
+ User user1 = new User();
+ user1.setName(TEST_NAME);
+ user1.setFavoriteNumber(256);
+ user1.setTypeDoubleTest(123.45d);
+ user1.setTypeBoolTest(true);
+ user1.setTypeArrayString(stringArray);
+ user1.setTypeArrayBoolean(booleanArray);
+ user1.setTypeEnum(TEST_ENUM_COLOR);
+ user1.setTypeMap(longMap);
+
+ // Construct via builder
+ User user2 = User.newBuilder()
+ .setName("Charlie")
+ .setFavoriteColor("blue")
+ .setFavoriteNumber(null)
+ .setTypeBoolTest(false)
+ .setTypeDoubleTest(1.337d)
+ .setTypeNullTest(null)
+ .setTypeLongTest(1337L)
+ .setTypeArrayString(new ArrayList<CharSequence>())
+ .setTypeArrayBoolean(new ArrayList<Boolean>())
+ .setTypeNullableArray(null)
+ .setTypeEnum(Colors.RED)
+ .setTypeMap(new HashMap<CharSequence, Long>())
+ .build();
+ DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
+ DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
+ dataFileWriter.create(user1.getSchema(), testFile);
+ dataFileWriter.append(user1);
+ dataFileWriter.append(user2);
+ dataFileWriter.close();
+ }
+
+ @Test
+ public void testDeserialisation() throws IOException {
+ Configuration parameters = new Configuration();
+
+ AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
+
+ format.configure(parameters);
+ FileInputSplit[] splits = format.createInputSplits(1);
+ assertEquals(splits.length, 1);
+ format.open(splits[0]);
+
+ User u = format.nextRecord(null);
+ assertNotNull(u);
+
+ String name = u.getName().toString();
+ assertNotNull("empty record", name);
+ assertEquals("name not equal", TEST_NAME, name);
+
+ // check arrays
+ List<CharSequence> sl = u.getTypeArrayString();
+ assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, sl.get(0).toString());
+ assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, sl.get(1).toString());
+
+ List<Boolean> bl = u.getTypeArrayBoolean();
+ assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0));
+ assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1));
+
+ // check enums
+ Colors enumValue = u.getTypeEnum();
+ assertEquals("enum not equal", TEST_ENUM_COLOR, enumValue);
+
+ // check maps
+ Map<CharSequence, Long> lm = u.getTypeMap();
+ assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue());
+ assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue());
+
+ assertFalse("expecting second element", format.reachedEnd());
+ assertNotNull("expecting second element", format.nextRecord(u));
+
+ assertNull(format.nextRecord(u));
+ assertTrue(format.reachedEnd());
+
+ format.close();
+ }
+
+ @After
+ public void deleteFiles() {
+ testFile.delete();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java
new file mode 100644
index 0000000..58e1f5c
--- /dev/null
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.flink.api.io.avro.generated;
+@SuppressWarnings("all")
+@org.apache.avro.specific.AvroGenerated
+public enum Colors {
+ RED, GREEN, BLUE ;
+ public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"enum\",\"name\":\"Colors\",\"namespace\":\"org.apache.flink.api.io.avro.generated\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}");
+ public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+}
[9/9] git commit: [FLINK-1118] Exclude log (and crash) files from rat
checks.
Posted by se...@apache.org.
[FLINK-1118] Exclude log (and crash) files from rat checks.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/15e59906
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/15e59906
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/15e59906
Branch: refs/heads/master
Commit: 15e5990624d7ae6a0c17beae04daf4bbfa63e142
Parents: 719e288
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 30 14:36:09 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 30 18:39:59 2014 +0200
----------------------------------------------------------------------
pom.xml | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/15e59906/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 081ccc0..3077778 100644
--- a/pom.xml
+++ b/pom.xml
@@ -535,6 +535,7 @@ under the License.
<!-- Additional files like .gitignore etc.-->
<exclude>**/.*</exclude>
<exclude>**/*.prefs</exclude>
+ <exclude>**/*.log</exclude>
<!-- Resource files which have values. -->
<exclude>**/resources/**</exclude>
<!-- Configuration Files. -->
[6/9] git commit: Major depedendency cleanups - add "flink-shading"
for shaded libraries (currently guava only) - relocate guava in all projects
and exclude it from the lib and exported dependencies - Exlude unnecessary
transitive dependencies from
Posted by se...@apache.org.
Major depedendency cleanups
- add "flink-shading" for shaded libraries (currently guava only)
- relocate guava in all projects and exclude it from the lib and exported dependencies
- Exlude unnecessary transitive dependencies from hadoop 1
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/758de7db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/758de7db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/758de7db
Branch: refs/heads/master
Commit: 758de7dbcd5148e17d94a45c5f6b74016b013a70
Parents: 7e43b1c
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Sep 29 20:39:09 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 30 18:39:58 2014 +0200
----------------------------------------------------------------------
.../flink-streaming-connectors/pom.xml | 51 +++++-
.../flink-streaming-core/pom.xml | 2 +-
.../flink-streaming-examples/pom.xml | 2 +-
flink-addons/flink-streaming/pom.xml | 20 ---
flink-core/pom.xml | 8 +
.../api/common/accumulators/Histogram.java | 5 +-
flink-dist/pom.xml | 12 ++
flink-dist/src/main/assemblies/bin.xml | 27 +--
flink-dist/src/main/flink-bin/bin/flink | 6 -
flink-dist/src/main/flink-bin/bin/webclient.sh | 6 -
flink-runtime/pom.xml | 28 +++
flink-shaded/pom.xml | 80 +++++++++
pom.xml | 172 +++++++++++--------
13 files changed, 272 insertions(+), 147 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/758de7db/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml b/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
index 9777477..e941112 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
@@ -99,8 +99,8 @@ under the License.
<artifactId>spymemcached</artifactId>
<version>2.8.4</version>
</dependency>
-
- </dependencies>
+
+ </dependencies>
<build>
<plugins>
@@ -115,14 +115,47 @@ under the License.
</execution>
</executions>
</plugin>
+
+ <!-- Relocate the entire Google Guava dependency into a different namespace -->
<plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- </configuration>
- </plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.3</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <shadedArtifactAttached>false</shadedArtifactAttached>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+ <dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
+ <artifactSet>
+ <includes>
+ <include>org.apache.flume:*</include>
+ <include>com.twitter:*</include>
+ </includes>
+ </artifactSet>
+ <relocations>
+ <relocation>
+ <pattern>com.google</pattern>
+ <shadedPattern>org.apache.flink.shaded.com.google</shadedPattern>
+ </relocation>
+ </relocations>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>build.properties</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </plugin>
+
</plugins>
</build>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/758de7db/flink-addons/flink-streaming/flink-streaming-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/pom.xml b/flink-addons/flink-streaming/flink-streaming-core/pom.xml
index 0be9839..63b1145 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-core/pom.xml
@@ -46,7 +46,7 @@ under the License.
<groupId>org.apache.commons</groupId>
<artifactId>commons-math</artifactId>
<version>2.2</version>
- </dependency>
+ </dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/758de7db/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
index 7c02f27..228d69c 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
@@ -48,7 +48,7 @@ under the License.
<version>${project.version}</version>
</dependency>
- <dependency>
+ <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-connectors</artifactId>
<version>${project.version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/758de7db/flink-addons/flink-streaming/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/pom.xml b/flink-addons/flink-streaming/pom.xml
index a354df6..2fcf5a8 100644
--- a/flink-addons/flink-streaming/pom.xml
+++ b/flink-addons/flink-streaming/pom.xml
@@ -44,51 +44,31 @@ under the License.
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
- <type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-compiler</artifactId>
<version>${project.version}</version>
- <type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${project.version}</version>
- <type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${project.version}</version>
- <type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${project.version}</version>
- <type>jar</type>
</dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-tests</artifactId>
- <version>${project.version}</version>
- <type>jar</type>
- </dependency>
-
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- <version>3.3.2</version>
- <type>jar</type>
- </dependency>
-
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/758de7db/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index e0c0a2f..b3ca398 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -34,6 +34,14 @@ under the License.
<packaging>jar</packaging>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
<build>
<plugins>
<plugin>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/758de7db/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java
index 857dada..6f5c0e9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java
@@ -21,12 +21,11 @@ package org.apache.flink.api.common.accumulators;
import java.io.IOException;
import java.util.Map;
+import java.util.TreeMap;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-import com.google.common.collect.Maps;
-
/**
* Histogram for discrete-data. Let's you populate a histogram distributedly.
* Implemented as a Integer->Integer TreeMap, so that the entries are sorted
@@ -39,7 +38,7 @@ public class Histogram implements Accumulator<Integer, Map<Integer, Integer>> {
private static final long serialVersionUID = 1L;
- private Map<Integer, Integer> treeMap = Maps.newTreeMap();
+ private Map<Integer, Integer> treeMap = new TreeMap<Integer, Integer>();
@Override
public void add(Integer value) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/758de7db/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index b9aee28..d39b5ac 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -37,6 +37,18 @@ under the License.
<!-- BINARIES -->
<dependency>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/758de7db/flink-dist/src/main/assemblies/bin.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml
index 90a3bcd..3353515 100644
--- a/flink-dist/src/main/assemblies/bin.xml
+++ b/flink-dist/src/main/assemblies/bin.xml
@@ -41,37 +41,12 @@ under the License.
<excludes>
<exclude>org.apache.flink:flink-java-examples</exclude>
<exclude>org.apache.flink:flink-scala-examples</exclude>
+ <exclude>org.apache.flink:flink-streaming-examples</exclude>
<!--
<exclude>**/*javadoc*</exclude>
<exclude>**/*sources*</exclude>
-->
- <!--
- This is a hardcoded exclude-list containing all libraries that are exclusively used in pact-clients.
- The previous command did not work because it also excludes libraries that should be in lib, such as commons-io.
- -->
- <exclude>commons-fileupload:commons-fileupload</exclude>
- </excludes>
- </dependencySet>
-
- <dependencySet>
- <outputDirectory>lib_clients</outputDirectory>
- <unpack>false</unpack>
- <useTransitiveDependencies>true</useTransitiveDependencies>
- <useProjectArtifact>false</useProjectArtifact>
- <useProjectAttachments>false</useProjectAttachments>
- <useTransitiveFiltering>true</useTransitiveFiltering>
-
- <includes>
- <include>org.apache.flink:flink-clients:**</include>
- </includes>
-
- <!--
- <excludes>
- <exclude>**/*examples*.jar</exclude>
- <exclude>**/*javadoc*</exclude>
- <exclude>**/*sources*</exclude>
</excludes>
- -->
</dependencySet>
</dependencySets>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/758de7db/flink-dist/src/main/flink-bin/bin/flink
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/flink b/flink-dist/src/main/flink-bin/bin/flink
index 4b4cd54..1dbc4f3 100755
--- a/flink-dist/src/main/flink-bin/bin/flink
+++ b/flink-dist/src/main/flink-bin/bin/flink
@@ -27,8 +27,6 @@ if [ "$FLINK_IDENT_STRING" = "" ]; then
FLINK_IDENT_STRING="$USER"
fi
-FLINK_LIB_CLIENTS_DIR=$FLINK_ROOT_DIR/lib_clients
-
# auxiliary function to construct a lightweight classpath for the
# Flink CLI client
@@ -41,10 +39,6 @@ constructCLIClientClassPath() {
CC_CLASSPATH=$CC_CLASSPATH:$jarfile
fi
done
-
- for jarfile in $FLINK_LIB_CLIENTS_DIR/*.jar ; do
- CC_CLASSPATH=$CC_CLASSPATH:$jarfile
- done
echo $CC_CLASSPATH
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/758de7db/flink-dist/src/main/flink-bin/bin/webclient.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/webclient.sh b/flink-dist/src/main/flink-bin/bin/webclient.sh
index b45001a..cda41bd 100755
--- a/flink-dist/src/main/flink-bin/bin/webclient.sh
+++ b/flink-dist/src/main/flink-bin/bin/webclient.sh
@@ -30,8 +30,6 @@ if [ "$FLINK_IDENT_STRING" = "" ]; then
FLINK_IDENT_STRING="$USER"
fi
-FLINK_LIB_CLIENTS_DIR=$FLINK_ROOT_DIR/lib_clients
-
JVM_ARGS="$JVM_ARGS -Xmx512m"
# auxilliary function to construct the classpath for the webclient
@@ -44,10 +42,6 @@ constructWebclientClassPath() {
FLINK_WEBCLIENT_CLASSPATH=$FLINK_WEBCLIENT_CLASSPATH:$jarfile
fi
done
-
- for jarfile in "$FLINK_LIB_CLIENTS_DIR"/*.jar ; do
- FLINK_WEBCLIENT_CLASSPATH=$FLINK_WEBCLIENT_CLASSPATH:$jarfile
- done
echo $FLINK_WEBCLIENT_CLASSPATH
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/758de7db/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 2a36dc8..b84a4a0 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -184,6 +184,34 @@ under the License.
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-api-2.1</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-2.1</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jdt</groupId>
+ <artifactId>core</artifactId>
+ </exclusion>
</exclusions>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/758de7db/flink-shaded/pom.xml
----------------------------------------------------------------------
diff --git a/flink-shaded/pom.xml b/flink-shaded/pom.xml
new file mode 100644
index 0000000..4d117ac
--- /dev/null
+++ b/flink-shaded/pom.xml
@@ -0,0 +1,80 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-parent</artifactId>
+ <version>0.7-incubating-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-shaded</artifactId>
+ <name>flink-shaded</name>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!-- Relocate the entire Google Guava dependency into a different namespace -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.3</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <shadedArtifactAttached>false</shadedArtifactAttached>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+ <dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
+ <artifactSet>
+ <includes>
+ <include>com.google.guava:guava</include>
+ </includes>
+ </artifactSet>
+ <relocations>
+ <relocation>
+ <pattern>com.google</pattern>
+ <shadedPattern>org.apache.flink.shaded.com.google</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/758de7db/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 99980ea..3c0b9b7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -52,6 +52,7 @@ under the License.
</scm>
<modules>
+ <module>flink-shaded</module>
<module>flink-core</module>
<module>flink-java</module>
<module>flink-scala</module>
@@ -83,13 +84,13 @@ under the License.
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
- <version>3.1</version>
+ <version>3.3.2</version>
</dependency>
<dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>${slf4j.version}</version>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
</dependency>
<dependency>
@@ -103,13 +104,12 @@ under the License.
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
-
+
+ <!-- global provided guava dependency, prevents guava from being packaged. we use a shaded variant instead -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
- <version>17.0</version>
- <type>jar</type>
- <scope>compile</scope>
+ <scope>provided</scope>
</dependency>
<dependency>
@@ -151,8 +151,64 @@ under the License.
<type>jar</type>
<scope>test</scope>
</dependency>
-
</dependencies>
+
+ <!-- this section defines the module versions that are used if nothing else is specified. -->
+ <dependencyManagement>
+ <dependencies>
+
+ <!-- Make sure we use a consistent guava version throughout the project -->
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>17.0</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <!-- Make sure we use a consistent jetty version throughout the project -->
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ <version>8.0.0.M1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-security</artifactId>
+ <version>8.0.0.M1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlet</artifactId>
+ <version>8.0.0.M1</version>
+ </dependency>
+
+ <!-- Make sure we use a consistent avro version throughout the project -->
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>1.7.6</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-mapred</artifactId>
+ <version>1.7.6</version>
+ </dependency>
+
+ <!-- Managed dependency required for HBase in flink-hbase -->
+ <dependency>
+ <groupId>org.javassist</groupId>
+ <artifactId>javassist</artifactId>
+ <version>3.18.1-GA</version>
+ </dependency>
+
+ <!-- stax is pulled in different versions by different transitive dependencies-->
+ <dependency>
+ <groupId>stax</groupId>
+ <artifactId>stax-api</artifactId>
+ <version>1.0.1</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
<profiles>
<profile>
@@ -359,45 +415,6 @@ under the License.
</profile>
</profiles>
- <dependencyManagement>
- <!-- this section defines the module versions that are used if nothing else is specified. -->
- <dependencies>
- <!-- Make sure we use a consistent jetty version throughout the project -->
- <dependency>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-server</artifactId>
- <version>8.0.0.M1</version>
- </dependency>
- <dependency>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-security</artifactId>
- <version>8.0.0.M1</version>
- </dependency>
- <dependency>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-servlet</artifactId>
- <version>8.0.0.M1</version>
- </dependency>
- <!-- Make sure we use a consistent avro version throughout the project -->
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- <version>1.7.6</version>
- </dependency>
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro-mapred</artifactId>
- <version>1.7.6</version>
- </dependency>
- <!-- Managed dependency required for HBase in pact-hbase -->
- <dependency>
- <groupId>org.javassist</groupId>
- <artifactId>javassist</artifactId>
- <version>3.18.1-GA</version>
- </dependency>
- </dependencies>
- </dependencyManagement>
-
<reporting>
<plugins>
<!-- execution of Unit Tests -->
@@ -424,33 +441,6 @@ under the License.
<build>
<plugins>
<plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>2.3</version>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <createDependencyReducedPom>false</createDependencyReducedPom>
- <artifactSet>
- <includes>
- <include>com.google.guava:guava</include>
- </includes>
- </artifactSet>
- <relocations>
- <relocation>
- <pattern>com.google</pattern>
- <shadedPattern>org.apache.flink.shaded.com.google</shadedPattern>
- </relocation>
- </relocations>
- </configuration>
- </plugin>
- <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version><!--$NO-MVN-MAN-VER$-->
@@ -463,6 +453,37 @@ under the License.
</archive>
</configuration>
</plugin>
+
+ <!-- Relocate references to Google Guava classes into a different namespace -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.3</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <shadedArtifactAttached>false</shadedArtifactAttached>
+ <createDependencyReducedPom>false</createDependencyReducedPom>
+ <artifactSet>
+ <includes>
+ <include>org.apache.flink:${project.artifact}</include>
+ </includes>
+ </artifactSet>
+ <relocations>
+ <relocation>
+ <pattern>com.google</pattern>
+ <shadedPattern>org.apache.flink.shaded.com.google</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </plugin>
+
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
@@ -531,6 +552,7 @@ under the License.
<exclude>tools/maven/checkstyle.xml</exclude>
<exclude>tools/maven/scalastyle-config.xml</exclude>
<exclude>**/scalastyle-output.xml</exclude>
+ <exclude>**/dependency-reduced-pom.xml</exclude>
<exclude>tools/maven/suppressions.xml</exclude>
<exclude>**/pom.xml</exclude>
<exclude>**/pom.hadoop2.xml</exclude>
[5/9] git commit: Deactivate shade plugin for quickstart projects
Posted by se...@apache.org.
Deactivate shade plugin for quickstart projects
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/38e4755a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/38e4755a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/38e4755a
Branch: refs/heads/master
Commit: 38e4755aa49e9849103f1e0ac35c04fe0b638bdb
Parents: 758de7d
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Sep 29 22:27:35 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 30 18:39:58 2014 +0200
----------------------------------------------------------------------
flink-quickstart/pom.xml | 14 +++++++++++++-
1 file changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/38e4755a/flink-quickstart/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/pom.xml b/flink-quickstart/pom.xml
index fbd0007..cf06a26 100644
--- a/flink-quickstart/pom.xml
+++ b/flink-quickstart/pom.xml
@@ -57,11 +57,23 @@ under the License.
<plugins>
<plugin>
<artifactId>maven-archetype-plugin</artifactId>
- <version>2.2</version>
+ <version>2.2</version><!--$NO-MVN-MAN-VER$-->
<configuration>
<skip>${skipTests}</skip>
</configuration>
</plugin>
+
+ <!-- deactivate the shade plugin for the quickstart scripts -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase/>
+ </execution>
+ </executions>
+ </plugin>
+
</plugins>
</build>
</project>
\ No newline at end of file
[8/9] git commit: - Allow dependencies to retain guava - Clean
dependencies in flink-streaming connectors
Posted by se...@apache.org.
- Allow dependencies to retain guava
- Clean dependencies in flink-streaming connectors
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/719e2889
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/719e2889
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/719e2889
Branch: refs/heads/master
Commit: 719e2889395d7565bab4e7b0d994e2db32e592ba
Parents: 626d6b7
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 30 14:16:16 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 30 18:39:59 2014 +0200
----------------------------------------------------------------------
flink-addons/flink-avro/pom.xml | 1 +
flink-addons/flink-hadoop-compatibility/pom.xml | 34 +++++
.../mapred/record/HadoopDataSink.java | 18 +--
.../mapred/record/HadoopDataSource.java | 14 +-
flink-addons/flink-hbase/pom.xml | 37 +++--
.../flink-streaming-connectors/pom.xml | 12 +-
flink-addons/flink-yarn/pom.xml | 144 +++++++++++++++++++
flink-clients/pom.xml | 15 +-
flink-compiler/pom.xml | 8 ++
flink-core/pom.xml | 20 ++-
.../typeutils/SerializerTestInstance.java | 4 +-
flink-dist/pom.xml | 18 +--
flink-dist/src/main/assemblies/bin.xml | 3 +
flink-dist/src/main/assemblies/yarn-uberjar.xml | 1 +
flink-dist/src/main/assemblies/yarn.xml | 2 +
flink-java/pom.xml | 57 +++++---
flink-runtime/pom.xml | 77 +++++++++-
flink-scala/pom.xml | 8 ++
flink-shaded/pom.xml | 1 +
flink-test-utils/pom.xml | 8 ++
flink-tests/pom.xml | 8 ++
pom.xml | 32 ++---
22 files changed, 432 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-addons/flink-avro/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/pom.xml b/flink-addons/flink-avro/pom.xml
index ff4decc..2d22507 100644
--- a/flink-addons/flink-avro/pom.xml
+++ b/flink-addons/flink-avro/pom.xml
@@ -58,6 +58,7 @@ under the License.
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
+ <version>${guava.version}</version>
<scope>provided</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-addons/flink-hadoop-compatibility/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/pom.xml b/flink-addons/flink-hadoop-compatibility/pom.xml
index a8ebe34..d5c42c2 100644
--- a/flink-addons/flink-hadoop-compatibility/pom.xml
+++ b/flink-addons/flink-hadoop-compatibility/pom.xml
@@ -75,6 +75,40 @@ under the License.
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>asm</groupId>
+ <artifactId>asm</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-api-2.1</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-2.1</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jdt</groupId>
+ <artifactId>core</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
</dependencies>
</profile>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java
index 38e166a..ff28a59 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java
@@ -19,6 +19,7 @@
package org.apache.flink.hadoopcompatibility.mapred.record;
+import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.operators.Operator;
@@ -31,9 +32,6 @@ import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-
/**
* The HadoopDataSink is a generic wrapper for all Hadoop OutputFormats.
*
@@ -54,7 +52,7 @@ public class HadoopDataSink<K,V> extends GenericDataSink implements Validatable
private JobConf jobConf;
public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, String name, Operator<Record> input, FlinkTypeConverter<K,V> conv, Class<K> keyClass, Class<V> valueClass) {
- this(hadoopFormat, jobConf, name, ImmutableList.<Operator<Record>>of(input), conv, keyClass, valueClass);
+ this(hadoopFormat, jobConf, name, Collections.<Operator<Record>>singletonList(input), conv, keyClass, valueClass);
}
public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, String name, Operator<Record> input, Class<K> keyClass, Class<V> valueClass) {
@@ -74,8 +72,11 @@ public class HadoopDataSink<K,V> extends GenericDataSink implements Validatable
@SuppressWarnings("deprecation")
public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, String name, List<Operator<Record>> input, FlinkTypeConverter<K,V> conv, Class<K> keyClass, Class<V> valueClass) {
super(new HadoopRecordOutputFormat<K,V>(hadoopFormat, jobConf, conv),input, name);
- Preconditions.checkNotNull(hadoopFormat);
- Preconditions.checkNotNull(jobConf);
+
+ if (hadoopFormat == null || jobConf == null) {
+ throw new NullPointerException();
+ }
+
this.name = name;
this.jobConf = jobConf;
jobConf.setOutputKeyClass(keyClass);
@@ -101,7 +102,8 @@ public class HadoopDataSink<K,V> extends GenericDataSink implements Validatable
@Override
public void check() {
// see for more details https://github.com/stratosphere/stratosphere/pull/531
- Preconditions.checkNotNull(FileOutputFormat.getOutputPath(jobConf), "The HadoopDataSink currently expects a correct outputPath.");
+ if (FileOutputFormat.getOutputPath(jobConf) == null) {
+ throw new NullPointerException("The HadoopDataSink currently expects a correct outputPath.");
+ }
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSource.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSource.java
index 4f56289..508f069 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSource.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSource.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.hadoopcompatibility.mapred.record;
@@ -26,10 +25,6 @@ import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.HadoopTypeCo
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
-import com.google.common.base.Preconditions;
-
-
-
/**
* The HadoopDataSource is a generic wrapper for all Hadoop InputFormats.
*
@@ -44,7 +39,6 @@ import com.google.common.base.Preconditions;
* The HadoopDataSource provides two different standard converters:
* * WritableWrapperConverter: Converts Hadoop Types to a record that contains a WritableComparableWrapper (key) and a WritableWrapper
* * DefaultHadoopTypeConverter: Converts the standard hadoop types (longWritable, Text) to Flinks's {@link org.apache.flink.types.Value} types.
- *
*/
public class HadoopDataSource<K,V> extends GenericDataSource<HadoopRecordInputFormat<K,V>> {
@@ -61,9 +55,11 @@ public class HadoopDataSource<K,V> extends GenericDataSource<HadoopRecordInputFo
*/
public HadoopDataSource(InputFormat<K,V> hadoopFormat, JobConf jobConf, String name, HadoopTypeConverter<K,V> conv) {
super(new HadoopRecordInputFormat<K,V>(hadoopFormat, jobConf, conv),name);
- Preconditions.checkNotNull(hadoopFormat);
- Preconditions.checkNotNull(jobConf);
- Preconditions.checkNotNull(conv);
+
+ if (hadoopFormat == null || jobConf == null || conv == null) {
+ throw new NullPointerException();
+ }
+
this.name = name;
this.jobConf = jobConf;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-addons/flink-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/pom.xml b/flink-addons/flink-hbase/pom.xml
index 5ae9174..c21c3ae 100644
--- a/flink-addons/flink-hbase/pom.xml
+++ b/flink-addons/flink-hbase/pom.xml
@@ -85,6 +85,34 @@ under the License.
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-api-2.1</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-2.1</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jdt</groupId>
+ <artifactId>core</artifactId>
+ </exclusion>
</exclusions>
</dependency>
</dependencies>
@@ -104,13 +132,4 @@ under the License.
to use profiles See ticket https://issues.apache.org/jira/browse/HADOOP-8009
for description of hadoop-clients -->
- <reporting>
- <plugins>
- </plugins>
- </reporting>
-
- <build>
- <plugins>
- </plugins>
- </build>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml b/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
index e941112..51a1621 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
@@ -59,6 +59,10 @@ under the License.
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </exclusion>
</exclusions>
</dependency>
@@ -72,9 +76,15 @@ under the License.
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.5.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
- <dependency>
+ <dependency>
<groupId>com.twitter</groupId>
<artifactId>hbc-core</artifactId>
<version>2.2.0</version>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-addons/flink-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/pom.xml b/flink-addons/flink-yarn/pom.xml
index 1f6e8f8..3b6e631 100644
--- a/flink-addons/flink-yarn/pom.xml
+++ b/flink-addons/flink-yarn/pom.xml
@@ -51,28 +51,172 @@ under the License.
<version>${project.version}</version>
</dependency>
+ <!-- guava needs to be in "provided" scope, so to not re-export the dependency -->
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>asm</groupId>
+ <artifactId>asm</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-api-2.1</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-2.1</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jdt</groupId>
+ <artifactId>core</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>asm</groupId>
+ <artifactId>asm</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-api-2.1</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-2.1</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jdt</groupId>
+ <artifactId>core</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>asm</groupId>
+ <artifactId>asm</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-api-2.1</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-2.1</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jdt</groupId>
+ <artifactId>core</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>asm</groupId>
+ <artifactId>asm</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-api-2.1</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-2.1</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jdt</groupId>
+ <artifactId>core</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-clients/pom.xml
----------------------------------------------------------------------
diff --git a/flink-clients/pom.xml b/flink-clients/pom.xml
index 9dccb85..dcb0add 100644
--- a/flink-clients/pom.xml
+++ b/flink-clients/pom.xml
@@ -95,6 +95,19 @@ under the License.
<version>2.4</version>
<scope>compile</scope>
</dependency>
+
+ <!-- guava needs to be in "provided" scope, so to not re-export the dependency -->
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ </dependency>
</dependencies>
<!-- More information on this:
@@ -198,8 +211,6 @@ under the License.
<ignore/>
</action>
</pluginExecution>
-
-
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-compiler/pom.xml
----------------------------------------------------------------------
diff --git a/flink-compiler/pom.xml b/flink-compiler/pom.xml
index fa4c0e9..3727548 100644
--- a/flink-compiler/pom.xml
+++ b/flink-compiler/pom.xml
@@ -56,6 +56,14 @@ under the License.
<scope>compile</scope>
</dependency>
+ <!-- guava needs to be in "provided" scope, so to not re-export the dependency -->
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index b3ca398..c1e1b18 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -36,9 +36,23 @@ under the License.
<dependencies>
<dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-shaded</artifactId>
- <version>${project.version}</version>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- guava needs to be in "provided" scope, so to not re-export the dependency -->
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ <scope>provided</scope>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
index 5b63633..c48e879 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
@@ -16,14 +16,12 @@
* limitations under the License.
*/
-
package org.apache.flink.api.common.typeutils;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-public class
- SerializerTestInstance<T> extends SerializerTestBase<T> {
+public class SerializerTestInstance<T> extends SerializerTestBase<T> {
private final TypeSerializer<T> serializer;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index d39b5ac..29f3341 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -117,6 +117,12 @@ under the License.
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-connectors</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
@@ -137,7 +143,7 @@ under the License.
</property>
</activation>
<dependencies>
- <!-- No extra dependencies: pact-hbase is currently not compatible with Hadoop v1 -->
+ <!-- No extra dependencies: flink-hbase is currently not compatible with Hadoop v1 -->
</dependencies>
</profile>
@@ -310,16 +316,6 @@ under the License.
</mapper>
</data>
<data>
- <src>${project.build.directory}/${project.artifactId}-${project.version}-bin/flink-${project.version}/lib_clients</src>
- <type>directory</type>
- <mapper>
- <type>perm</type>
- <prefix>/usr/share/${project.artifactId}/lib_clients</prefix>
- <user>root</user>
- <group>root</group>
- </mapper>
- </data>
- <data>
<src>${project.build.directory}/${project.artifactId}-${project.version}-bin/flink-${project.version}/resources</src>
<type>directory</type>
<mapper>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-dist/src/main/assemblies/bin.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml
index 3353515..0dc0196 100644
--- a/flink-dist/src/main/assemblies/bin.xml
+++ b/flink-dist/src/main/assemblies/bin.xml
@@ -140,6 +140,7 @@ under the License.
</includes>
<excludes>
<exclude>flink-java-examples-${project.version}.jar</exclude>
+ <exclude>original-flink-java-examples-${project.version}.jar</exclude>
<exclude>flink-java-examples-${project.version}-sources.jar</exclude>
</excludes>
</fileSet>
@@ -154,7 +155,9 @@ under the License.
</includes>
<excludes>
<exclude>flink-scala-examples-${project.version}.jar</exclude>
+ <exclude>original-flink-scala-examples-${project.version}.jar</exclude>
<exclude>flink-scala-examples-${project.version}-sources.jar</exclude>
+ <exclude>flink-scala-examples-${project.version}-javadoc.jar</exclude>
</excludes>
</fileSet>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-dist/src/main/assemblies/yarn-uberjar.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/yarn-uberjar.xml b/flink-dist/src/main/assemblies/yarn-uberjar.xml
index 0bb1ea2..54be9de 100644
--- a/flink-dist/src/main/assemblies/yarn-uberjar.xml
+++ b/flink-dist/src/main/assemblies/yarn-uberjar.xml
@@ -39,6 +39,7 @@ under the License.
<excludes>
<exclude>org.apache.flink:flink-java-examples:*</exclude>
<exclude>org.apache.flink:flink-scala-examples:*</exclude>
+ <exclude>org.apache.flink:flink-streaming-examples:*</exclude>
</excludes>
</dependencySet>
</dependencySets>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-dist/src/main/assemblies/yarn.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/yarn.xml b/flink-dist/src/main/assemblies/yarn.xml
index 1d062fe..6cb7d3d 100644
--- a/flink-dist/src/main/assemblies/yarn.xml
+++ b/flink-dist/src/main/assemblies/yarn.xml
@@ -87,6 +87,7 @@ under the License.
</includes>
<excludes>
<exclude>flink-java-examples-${project.version}.jar</exclude>
+ <exclude>original-flink-java-examples-${project.version}.jar</exclude>
<exclude>flink-java-examples-${project.version}-sources.jar</exclude>
</excludes>
</fileSet>
@@ -101,6 +102,7 @@ under the License.
</includes>
<excludes>
<exclude>flink-scala-examples-${project.version}.jar</exclude>
+ <exclude>original-flink-scala-examples-${project.version}.jar</exclude>
<exclude>flink-scala-examples-${project.version}-sources.jar</exclude>
</excludes>
</fileSet>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java/pom.xml b/flink-java/pom.xml
index 5165a78..4b85537 100644
--- a/flink-java/pom.xml
+++ b/flink-java/pom.xml
@@ -40,39 +40,50 @@ under the License.
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-core</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
+
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<!-- version is derived from base module -->
</dependency>
+
<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
<version>2.24.0</version>
</dependency>
+
+ <!-- guava needs to be in "provided" scope, so to not re-export the dependency -->
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
- <!-- Because flink-scala uses it in tests -->
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <executions>
- <execution>
- <goals>
- <goal>test-jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
+ <!-- Because flink-scala uses it in tests -->
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index b84a4a0..ddaf452 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -16,7 +16,6 @@ software distributed under the License is distributed on an
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
@@ -45,7 +44,14 @@ under the License.
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
- <version>1.2</version>
+ </dependency>
+
+ <!-- guava needs to be in "provided" scope, so to not re-export the dependency -->
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
@@ -186,7 +192,7 @@ under the License.
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
- <artifactId>jasper-compiler</artifactId>
+ <artifactId>jasper-compiler</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
@@ -225,7 +231,6 @@ under the License.
</property>
</activation>
<dependencies>
- <!-- YARN -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
@@ -234,6 +239,38 @@ under the License.
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-api-2.1</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-2.1</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jdt</groupId>
+ <artifactId>core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -244,6 +281,38 @@ under the License.
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-api-2.1</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-2.1</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jdt</groupId>
+ <artifactId>core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
</exclusions>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml
index e0b9aeb..87bd3fb 100644
--- a/flink-scala/pom.xml
+++ b/flink-scala/pom.xml
@@ -90,6 +90,14 @@ under the License.
<artifactId>asm</artifactId>
<version>4.0</version>
</dependency>
+
+ <!-- guava needs to be in "provided" scope, so to not re-export the dependency -->
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.scalatest</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-shaded/pom.xml
----------------------------------------------------------------------
diff --git a/flink-shaded/pom.xml b/flink-shaded/pom.xml
index 4d117ac..50f9749 100644
--- a/flink-shaded/pom.xml
+++ b/flink-shaded/pom.xml
@@ -38,6 +38,7 @@ under the License.
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
+ <version>${guava.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-test-utils/pom.xml
----------------------------------------------------------------------
diff --git a/flink-test-utils/pom.xml b/flink-test-utils/pom.xml
index 76a5b7d..acf6e85 100644
--- a/flink-test-utils/pom.xml
+++ b/flink-test-utils/pom.xml
@@ -65,5 +65,13 @@ under the License.
<artifactId>junit</artifactId>
<version>4.11</version>
</dependency>
+
+ <!-- guava needs to be in "provided" scope, so to not re-export the dependency -->
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index fb36226..f994924 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -91,6 +91,14 @@ under the License.
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+
+ <!-- guava needs to be in "provided" scope, so to not re-export the dependency -->
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3c0b9b7..081ccc0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -78,6 +78,7 @@ under the License.
<flink.forkCount>1.5C</flink.forkCount>
<flink.reuseForks>true</flink.reuseForks>
<slf4j.version>1.7.7</slf4j.version>
+ <guava.version>17.0</guava.version>
</properties>
<dependencies>
@@ -104,13 +105,6 @@ under the License.
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
-
- <!-- global provided guava dependency, prevents guava from being packaged. we use a shaded variant instead -->
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <scope>provided</scope>
- </dependency>
<dependency>
<groupId>junit</groupId>
@@ -156,15 +150,7 @@ under the License.
<!-- this section defines the module versions that are used if nothing else is specified. -->
<dependencyManagement>
<dependencies>
-
- <!-- Make sure we use a consistent guava version throughout the project -->
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>17.0</version>
- <scope>compile</scope>
- </dependency>
-
+
<!-- Make sure we use a consistent jetty version throughout the project -->
<dependency>
<groupId>org.eclipse.jetty</groupId>
@@ -181,6 +167,11 @@ under the License.
<artifactId>jetty-servlet</artifactId>
<version>8.0.0.M1</version>
</dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ <version>8.0.0.M1</version>
+ </dependency>
<!-- Make sure we use a consistent avro version throughout the project -->
<dependency>
@@ -190,10 +181,17 @@ under the License.
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
- <artifactId>avro-mapred</artifactId>
+ <artifactId>avro-ipc</artifactId>
<version>1.7.6</version>
</dependency>
+ <!-- Make sure we use a consistent commons-cli version throughout the project -->
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ <version>1.2</version>
+ </dependency>
+
<!-- Managed dependency required for HBase in flink-hbase -->
<dependency>
<groupId>org.javassist</groupId>