You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/30 19:59:00 UTC

[1/9] [FLINK-1117] Clean up flink-avro project: remove deprecated AvroRecord format, migrate tests to new java api.

Repository: incubator-flink
Updated Branches:
  refs/heads/master e7c4c8586 -> 15e599062


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/User.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/User.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/User.java
new file mode 100644
index 0000000..505857e
--- /dev/null
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/User.java
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * Autogenerated by Avro
+ * 
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.flink.api.io.avro.generated;  
+@SuppressWarnings("all")
+@org.apache.avro.specific.AvroGenerated
+public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"org.apache.flink.api.io.avro.generated\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]},{\"name\":\"type_long_test\",\"type\":[\"long\",\"null\"]},{\"name\":\"type_double_test\",\"type\":[\"double\"]},{\"name\":\"type_null_test\",\"type\":[\"null\"]},{\"name\":\"type_bool_test\",\"type\":[\"boolean\"]},{\"name\":\"type_array_string\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"type_array_boolean\",\"type\":{\"type\":\"array\",\"items\":\"boolean\"}},{\"name\":\"type_nullable_array\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"default\":null},{\"name\":\"type_enum\",\"type\":{\"type\":\"enum\",\"name\":\"Colors\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}},{\"name\":\"type
 _map\",\"type\":{\"type\":\"map\",\"values\":\"long\"}}]}");
+  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+  @Deprecated public java.lang.CharSequence name;
+  @Deprecated public java.lang.Integer favorite_number;
+  @Deprecated public java.lang.CharSequence favorite_color;
+  @Deprecated public java.lang.Long type_long_test;
+  @Deprecated public java.lang.Object type_double_test;
+  @Deprecated public java.lang.Object type_null_test;
+  @Deprecated public java.lang.Object type_bool_test;
+  @Deprecated public java.util.List<java.lang.CharSequence> type_array_string;
+  @Deprecated public java.util.List<java.lang.Boolean> type_array_boolean;
+  @Deprecated public java.util.List<java.lang.CharSequence> type_nullable_array;
+  @Deprecated public org.apache.flink.api.io.avro.generated.Colors type_enum;
+  @Deprecated public java.util.Map<java.lang.CharSequence,java.lang.Long> type_map;
+
+  /**
+   * Default constructor.  Note that this does not initialize fields
+   * to their default values from the schema.  If that is desired then
+   * one should use {@link \#newBuilder()}. 
+   */
+  public User() {}
+
+  /**
+   * All-args constructor.
+   */
+  public User(java.lang.CharSequence name, java.lang.Integer favorite_number, java.lang.CharSequence favorite_color, java.lang.Long type_long_test, java.lang.Object type_double_test, java.lang.Object type_null_test, java.lang.Object type_bool_test, java.util.List<java.lang.CharSequence> type_array_string, java.util.List<java.lang.Boolean> type_array_boolean, java.util.List<java.lang.CharSequence> type_nullable_array, org.apache.flink.api.io.avro.generated.Colors type_enum, java.util.Map<java.lang.CharSequence,java.lang.Long> type_map) {
+    this.name = name;
+    this.favorite_number = favorite_number;
+    this.favorite_color = favorite_color;
+    this.type_long_test = type_long_test;
+    this.type_double_test = type_double_test;
+    this.type_null_test = type_null_test;
+    this.type_bool_test = type_bool_test;
+    this.type_array_string = type_array_string;
+    this.type_array_boolean = type_array_boolean;
+    this.type_nullable_array = type_nullable_array;
+    this.type_enum = type_enum;
+    this.type_map = type_map;
+  }
+
+  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+  // Used by DatumWriter.  Applications should not call. 
+  public java.lang.Object get(int field$) {
+    switch (field$) {
+    case 0: return name;
+    case 1: return favorite_number;
+    case 2: return favorite_color;
+    case 3: return type_long_test;
+    case 4: return type_double_test;
+    case 5: return type_null_test;
+    case 6: return type_bool_test;
+    case 7: return type_array_string;
+    case 8: return type_array_boolean;
+    case 9: return type_nullable_array;
+    case 10: return type_enum;
+    case 11: return type_map;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+  // Used by DatumReader.  Applications should not call. 
+  @SuppressWarnings(value="unchecked")
+  public void put(int field$, java.lang.Object value$) {
+    switch (field$) {
+    case 0: name = (java.lang.CharSequence)value$; break;
+    case 1: favorite_number = (java.lang.Integer)value$; break;
+    case 2: favorite_color = (java.lang.CharSequence)value$; break;
+    case 3: type_long_test = (java.lang.Long)value$; break;
+    case 4: type_double_test = (java.lang.Object)value$; break;
+    case 5: type_null_test = (java.lang.Object)value$; break;
+    case 6: type_bool_test = (java.lang.Object)value$; break;
+    case 7: type_array_string = (java.util.List<java.lang.CharSequence>)value$; break;
+    case 8: type_array_boolean = (java.util.List<java.lang.Boolean>)value$; break;
+    case 9: type_nullable_array = (java.util.List<java.lang.CharSequence>)value$; break;
+    case 10: type_enum = (org.apache.flink.api.io.avro.generated.Colors)value$; break;
+    case 11: type_map = (java.util.Map<java.lang.CharSequence,java.lang.Long>)value$; break;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+
+  /**
+   * Gets the value of the 'name' field.
+   */
+  public java.lang.CharSequence getName() {
+    return name;
+  }
+
+  /**
+   * Sets the value of the 'name' field.
+   * @param value the value to set.
+   */
+  public void setName(java.lang.CharSequence value) {
+    this.name = value;
+  }
+
+  /**
+   * Gets the value of the 'favorite_number' field.
+   */
+  public java.lang.Integer getFavoriteNumber() {
+    return favorite_number;
+  }
+
+  /**
+   * Sets the value of the 'favorite_number' field.
+   * @param value the value to set.
+   */
+  public void setFavoriteNumber(java.lang.Integer value) {
+    this.favorite_number = value;
+  }
+
+  /**
+   * Gets the value of the 'favorite_color' field.
+   */
+  public java.lang.CharSequence getFavoriteColor() {
+    return favorite_color;
+  }
+
+  /**
+   * Sets the value of the 'favorite_color' field.
+   * @param value the value to set.
+   */
+  public void setFavoriteColor(java.lang.CharSequence value) {
+    this.favorite_color = value;
+  }
+
+  /**
+   * Gets the value of the 'type_long_test' field.
+   */
+  public java.lang.Long getTypeLongTest() {
+    return type_long_test;
+  }
+
+  /**
+   * Sets the value of the 'type_long_test' field.
+   * @param value the value to set.
+   */
+  public void setTypeLongTest(java.lang.Long value) {
+    this.type_long_test = value;
+  }
+
+  /**
+   * Gets the value of the 'type_double_test' field.
+   */
+  public java.lang.Object getTypeDoubleTest() {
+    return type_double_test;
+  }
+
+  /**
+   * Sets the value of the 'type_double_test' field.
+   * @param value the value to set.
+   */
+  public void setTypeDoubleTest(java.lang.Object value) {
+    this.type_double_test = value;
+  }
+
+  /**
+   * Gets the value of the 'type_null_test' field.
+   */
+  public java.lang.Object getTypeNullTest() {
+    return type_null_test;
+  }
+
+  /**
+   * Sets the value of the 'type_null_test' field.
+   * @param value the value to set.
+   */
+  public void setTypeNullTest(java.lang.Object value) {
+    this.type_null_test = value;
+  }
+
+  /**
+   * Gets the value of the 'type_bool_test' field.
+   */
+  public java.lang.Object getTypeBoolTest() {
+    return type_bool_test;
+  }
+
+  /**
+   * Sets the value of the 'type_bool_test' field.
+   * @param value the value to set.
+   */
+  public void setTypeBoolTest(java.lang.Object value) {
+    this.type_bool_test = value;
+  }
+
+  /**
+   * Gets the value of the 'type_array_string' field.
+   */
+  public java.util.List<java.lang.CharSequence> getTypeArrayString() {
+    return type_array_string;
+  }
+
+  /**
+   * Sets the value of the 'type_array_string' field.
+   * @param value the value to set.
+   */
+  public void setTypeArrayString(java.util.List<java.lang.CharSequence> value) {
+    this.type_array_string = value;
+  }
+
+  /**
+   * Gets the value of the 'type_array_boolean' field.
+   */
+  public java.util.List<java.lang.Boolean> getTypeArrayBoolean() {
+    return type_array_boolean;
+  }
+
+  /**
+   * Sets the value of the 'type_array_boolean' field.
+   * @param value the value to set.
+   */
+  public void setTypeArrayBoolean(java.util.List<java.lang.Boolean> value) {
+    this.type_array_boolean = value;
+  }
+
+  /**
+   * Gets the value of the 'type_nullable_array' field.
+   */
+  public java.util.List<java.lang.CharSequence> getTypeNullableArray() {
+    return type_nullable_array;
+  }
+
+  /**
+   * Sets the value of the 'type_nullable_array' field.
+   * @param value the value to set.
+   */
+  public void setTypeNullableArray(java.util.List<java.lang.CharSequence> value) {
+    this.type_nullable_array = value;
+  }
+
+  /**
+   * Gets the value of the 'type_enum' field.
+   */
+  public org.apache.flink.api.io.avro.generated.Colors getTypeEnum() {
+    return type_enum;
+  }
+
+  /**
+   * Sets the value of the 'type_enum' field.
+   * @param value the value to set.
+   */
+  public void setTypeEnum(org.apache.flink.api.io.avro.generated.Colors value) {
+    this.type_enum = value;
+  }
+
+  /**
+   * Gets the value of the 'type_map' field.
+   */
+  public java.util.Map<java.lang.CharSequence,java.lang.Long> getTypeMap() {
+    return type_map;
+  }
+
+  /**
+   * Sets the value of the 'type_map' field.
+   * @param value the value to set.
+   */
+  public void setTypeMap(java.util.Map<java.lang.CharSequence,java.lang.Long> value) {
+    this.type_map = value;
+  }
+
+  /** Creates a new User RecordBuilder */
+  public static org.apache.flink.api.io.avro.generated.User.Builder newBuilder() {
+    return new org.apache.flink.api.io.avro.generated.User.Builder();
+  }
+  
+  /** Creates a new User RecordBuilder by copying an existing Builder */
+  public static org.apache.flink.api.io.avro.generated.User.Builder newBuilder(org.apache.flink.api.io.avro.generated.User.Builder other) {
+    return new org.apache.flink.api.io.avro.generated.User.Builder(other);
+  }
+  
+  /** Creates a new User RecordBuilder by copying an existing User instance */
+  public static org.apache.flink.api.io.avro.generated.User.Builder newBuilder(org.apache.flink.api.io.avro.generated.User other) {
+    return new org.apache.flink.api.io.avro.generated.User.Builder(other);
+  }
+  
+  /**
+   * RecordBuilder for User instances.
+   */
+  public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<User>
+    implements org.apache.avro.data.RecordBuilder<User> {
+
+    private java.lang.CharSequence name;
+    private java.lang.Integer favorite_number;
+    private java.lang.CharSequence favorite_color;
+    private java.lang.Long type_long_test;
+    private java.lang.Object type_double_test;
+    private java.lang.Object type_null_test;
+    private java.lang.Object type_bool_test;
+    private java.util.List<java.lang.CharSequence> type_array_string;
+    private java.util.List<java.lang.Boolean> type_array_boolean;
+    private java.util.List<java.lang.CharSequence> type_nullable_array;
+    private org.apache.flink.api.io.avro.generated.Colors type_enum;
+    private java.util.Map<java.lang.CharSequence,java.lang.Long> type_map;
+
+    /** Creates a new Builder */
+    private Builder() {
+      super(org.apache.flink.api.io.avro.generated.User.SCHEMA$);
+    }
+    
+    /** Creates a Builder by copying an existing Builder */
+    private Builder(org.apache.flink.api.io.avro.generated.User.Builder other) {
+      super(other);
+      if (isValidValue(fields()[0], other.name)) {
+        this.name = data().deepCopy(fields()[0].schema(), other.name);
+        fieldSetFlags()[0] = true;
+      }
+      if (isValidValue(fields()[1], other.favorite_number)) {
+        this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
+        fieldSetFlags()[1] = true;
+      }
+      if (isValidValue(fields()[2], other.favorite_color)) {
+        this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
+        fieldSetFlags()[2] = true;
+      }
+      if (isValidValue(fields()[3], other.type_long_test)) {
+        this.type_long_test = data().deepCopy(fields()[3].schema(), other.type_long_test);
+        fieldSetFlags()[3] = true;
+      }
+      if (isValidValue(fields()[4], other.type_double_test)) {
+        this.type_double_test = data().deepCopy(fields()[4].schema(), other.type_double_test);
+        fieldSetFlags()[4] = true;
+      }
+      if (isValidValue(fields()[5], other.type_null_test)) {
+        this.type_null_test = data().deepCopy(fields()[5].schema(), other.type_null_test);
+        fieldSetFlags()[5] = true;
+      }
+      if (isValidValue(fields()[6], other.type_bool_test)) {
+        this.type_bool_test = data().deepCopy(fields()[6].schema(), other.type_bool_test);
+        fieldSetFlags()[6] = true;
+      }
+      if (isValidValue(fields()[7], other.type_array_string)) {
+        this.type_array_string = data().deepCopy(fields()[7].schema(), other.type_array_string);
+        fieldSetFlags()[7] = true;
+      }
+      if (isValidValue(fields()[8], other.type_array_boolean)) {
+        this.type_array_boolean = data().deepCopy(fields()[8].schema(), other.type_array_boolean);
+        fieldSetFlags()[8] = true;
+      }
+      if (isValidValue(fields()[9], other.type_nullable_array)) {
+        this.type_nullable_array = data().deepCopy(fields()[9].schema(), other.type_nullable_array);
+        fieldSetFlags()[9] = true;
+      }
+      if (isValidValue(fields()[10], other.type_enum)) {
+        this.type_enum = data().deepCopy(fields()[10].schema(), other.type_enum);
+        fieldSetFlags()[10] = true;
+      }
+      if (isValidValue(fields()[11], other.type_map)) {
+        this.type_map = data().deepCopy(fields()[11].schema(), other.type_map);
+        fieldSetFlags()[11] = true;
+      }
+    }
+    
+    /** Creates a Builder by copying an existing User instance */
+    private Builder(org.apache.flink.api.io.avro.generated.User other) {
+            super(org.apache.flink.api.io.avro.generated.User.SCHEMA$);
+      if (isValidValue(fields()[0], other.name)) {
+        this.name = data().deepCopy(fields()[0].schema(), other.name);
+        fieldSetFlags()[0] = true;
+      }
+      if (isValidValue(fields()[1], other.favorite_number)) {
+        this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
+        fieldSetFlags()[1] = true;
+      }
+      if (isValidValue(fields()[2], other.favorite_color)) {
+        this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
+        fieldSetFlags()[2] = true;
+      }
+      if (isValidValue(fields()[3], other.type_long_test)) {
+        this.type_long_test = data().deepCopy(fields()[3].schema(), other.type_long_test);
+        fieldSetFlags()[3] = true;
+      }
+      if (isValidValue(fields()[4], other.type_double_test)) {
+        this.type_double_test = data().deepCopy(fields()[4].schema(), other.type_double_test);
+        fieldSetFlags()[4] = true;
+      }
+      if (isValidValue(fields()[5], other.type_null_test)) {
+        this.type_null_test = data().deepCopy(fields()[5].schema(), other.type_null_test);
+        fieldSetFlags()[5] = true;
+      }
+      if (isValidValue(fields()[6], other.type_bool_test)) {
+        this.type_bool_test = data().deepCopy(fields()[6].schema(), other.type_bool_test);
+        fieldSetFlags()[6] = true;
+      }
+      if (isValidValue(fields()[7], other.type_array_string)) {
+        this.type_array_string = data().deepCopy(fields()[7].schema(), other.type_array_string);
+        fieldSetFlags()[7] = true;
+      }
+      if (isValidValue(fields()[8], other.type_array_boolean)) {
+        this.type_array_boolean = data().deepCopy(fields()[8].schema(), other.type_array_boolean);
+        fieldSetFlags()[8] = true;
+      }
+      if (isValidValue(fields()[9], other.type_nullable_array)) {
+        this.type_nullable_array = data().deepCopy(fields()[9].schema(), other.type_nullable_array);
+        fieldSetFlags()[9] = true;
+      }
+      if (isValidValue(fields()[10], other.type_enum)) {
+        this.type_enum = data().deepCopy(fields()[10].schema(), other.type_enum);
+        fieldSetFlags()[10] = true;
+      }
+      if (isValidValue(fields()[11], other.type_map)) {
+        this.type_map = data().deepCopy(fields()[11].schema(), other.type_map);
+        fieldSetFlags()[11] = true;
+      }
+    }
+
+    /** Gets the value of the 'name' field */
+    public java.lang.CharSequence getName() {
+      return name;
+    }
+    
+    /** Sets the value of the 'name' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder setName(java.lang.CharSequence value) {
+      validate(fields()[0], value);
+      this.name = value;
+      fieldSetFlags()[0] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'name' field has been set */
+    public boolean hasName() {
+      return fieldSetFlags()[0];
+    }
+    
+    /** Clears the value of the 'name' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder clearName() {
+      name = null;
+      fieldSetFlags()[0] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'favorite_number' field */
+    public java.lang.Integer getFavoriteNumber() {
+      return favorite_number;
+    }
+    
+    /** Sets the value of the 'favorite_number' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder setFavoriteNumber(java.lang.Integer value) {
+      validate(fields()[1], value);
+      this.favorite_number = value;
+      fieldSetFlags()[1] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'favorite_number' field has been set */
+    public boolean hasFavoriteNumber() {
+      return fieldSetFlags()[1];
+    }
+    
+    /** Clears the value of the 'favorite_number' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder clearFavoriteNumber() {
+      favorite_number = null;
+      fieldSetFlags()[1] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'favorite_color' field */
+    public java.lang.CharSequence getFavoriteColor() {
+      return favorite_color;
+    }
+    
+    /** Sets the value of the 'favorite_color' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder setFavoriteColor(java.lang.CharSequence value) {
+      validate(fields()[2], value);
+      this.favorite_color = value;
+      fieldSetFlags()[2] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'favorite_color' field has been set */
+    public boolean hasFavoriteColor() {
+      return fieldSetFlags()[2];
+    }
+    
+    /** Clears the value of the 'favorite_color' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder clearFavoriteColor() {
+      favorite_color = null;
+      fieldSetFlags()[2] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'type_long_test' field */
+    public java.lang.Long getTypeLongTest() {
+      return type_long_test;
+    }
+    
+    /** Sets the value of the 'type_long_test' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder setTypeLongTest(java.lang.Long value) {
+      validate(fields()[3], value);
+      this.type_long_test = value;
+      fieldSetFlags()[3] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'type_long_test' field has been set */
+    public boolean hasTypeLongTest() {
+      return fieldSetFlags()[3];
+    }
+    
+    /** Clears the value of the 'type_long_test' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder clearTypeLongTest() {
+      type_long_test = null;
+      fieldSetFlags()[3] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'type_double_test' field */
+    public java.lang.Object getTypeDoubleTest() {
+      return type_double_test;
+    }
+    
+    /** Sets the value of the 'type_double_test' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder setTypeDoubleTest(java.lang.Object value) {
+      validate(fields()[4], value);
+      this.type_double_test = value;
+      fieldSetFlags()[4] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'type_double_test' field has been set */
+    public boolean hasTypeDoubleTest() {
+      return fieldSetFlags()[4];
+    }
+    
+    /** Clears the value of the 'type_double_test' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder clearTypeDoubleTest() {
+      type_double_test = null;
+      fieldSetFlags()[4] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'type_null_test' field */
+    public java.lang.Object getTypeNullTest() {
+      return type_null_test;
+    }
+    
+    /** Sets the value of the 'type_null_test' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder setTypeNullTest(java.lang.Object value) {
+      validate(fields()[5], value);
+      this.type_null_test = value;
+      fieldSetFlags()[5] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'type_null_test' field has been set */
+    public boolean hasTypeNullTest() {
+      return fieldSetFlags()[5];
+    }
+    
+    /** Clears the value of the 'type_null_test' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder clearTypeNullTest() {
+      type_null_test = null;
+      fieldSetFlags()[5] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'type_bool_test' field */
+    public java.lang.Object getTypeBoolTest() {
+      return type_bool_test;
+    }
+    
+    /** Sets the value of the 'type_bool_test' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder setTypeBoolTest(java.lang.Object value) {
+      validate(fields()[6], value);
+      this.type_bool_test = value;
+      fieldSetFlags()[6] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'type_bool_test' field has been set */
+    public boolean hasTypeBoolTest() {
+      return fieldSetFlags()[6];
+    }
+    
+    /** Clears the value of the 'type_bool_test' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder clearTypeBoolTest() {
+      type_bool_test = null;
+      fieldSetFlags()[6] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'type_array_string' field */
+    public java.util.List<java.lang.CharSequence> getTypeArrayString() {
+      return type_array_string;
+    }
+    
+    /** Sets the value of the 'type_array_string' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder setTypeArrayString(java.util.List<java.lang.CharSequence> value) {
+      validate(fields()[7], value);
+      this.type_array_string = value;
+      fieldSetFlags()[7] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'type_array_string' field has been set */
+    public boolean hasTypeArrayString() {
+      return fieldSetFlags()[7];
+    }
+    
+    /** Clears the value of the 'type_array_string' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder clearTypeArrayString() {
+      type_array_string = null;
+      fieldSetFlags()[7] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'type_array_boolean' field */
+    public java.util.List<java.lang.Boolean> getTypeArrayBoolean() {
+      return type_array_boolean;
+    }
+    
+    /** Sets the value of the 'type_array_boolean' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder setTypeArrayBoolean(java.util.List<java.lang.Boolean> value) {
+      validate(fields()[8], value);
+      this.type_array_boolean = value;
+      fieldSetFlags()[8] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'type_array_boolean' field has been set */
+    public boolean hasTypeArrayBoolean() {
+      return fieldSetFlags()[8];
+    }
+    
+    /** Clears the value of the 'type_array_boolean' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder clearTypeArrayBoolean() {
+      type_array_boolean = null;
+      fieldSetFlags()[8] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'type_nullable_array' field */
+    public java.util.List<java.lang.CharSequence> getTypeNullableArray() {
+      return type_nullable_array;
+    }
+    
+    /** Sets the value of the 'type_nullable_array' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder setTypeNullableArray(java.util.List<java.lang.CharSequence> value) {
+      validate(fields()[9], value);
+      this.type_nullable_array = value;
+      fieldSetFlags()[9] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'type_nullable_array' field has been set */
+    public boolean hasTypeNullableArray() {
+      return fieldSetFlags()[9];
+    }
+    
+    /** Clears the value of the 'type_nullable_array' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder clearTypeNullableArray() {
+      type_nullable_array = null;
+      fieldSetFlags()[9] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'type_enum' field */
+    public org.apache.flink.api.io.avro.generated.Colors getTypeEnum() {
+      return type_enum;
+    }
+    
+    /** Sets the value of the 'type_enum' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder setTypeEnum(org.apache.flink.api.io.avro.generated.Colors value) {
+      validate(fields()[10], value);
+      this.type_enum = value;
+      fieldSetFlags()[10] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'type_enum' field has been set */
+    public boolean hasTypeEnum() {
+      return fieldSetFlags()[10];
+    }
+    
+    /** Clears the value of the 'type_enum' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder clearTypeEnum() {
+      type_enum = null;
+      fieldSetFlags()[10] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'type_map' field */
+    public java.util.Map<java.lang.CharSequence,java.lang.Long> getTypeMap() {
+      return type_map;
+    }
+    
+    /** Sets the value of the 'type_map' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder setTypeMap(java.util.Map<java.lang.CharSequence,java.lang.Long> value) {
+      validate(fields()[11], value);
+      this.type_map = value;
+      fieldSetFlags()[11] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'type_map' field has been set */
+    public boolean hasTypeMap() {
+      return fieldSetFlags()[11];
+    }
+    
+    /** Clears the value of the 'type_map' field */
+    public org.apache.flink.api.io.avro.generated.User.Builder clearTypeMap() {
+      type_map = null;
+      fieldSetFlags()[11] = false;
+      return this;
+    }
+
+    @Override
+    public User build() {
+      try {
+        User record = new User();
+        record.name = fieldSetFlags()[0] ? this.name : (java.lang.CharSequence) defaultValue(fields()[0]);
+        record.favorite_number = fieldSetFlags()[1] ? this.favorite_number : (java.lang.Integer) defaultValue(fields()[1]);
+        record.favorite_color = fieldSetFlags()[2] ? this.favorite_color : (java.lang.CharSequence) defaultValue(fields()[2]);
+        record.type_long_test = fieldSetFlags()[3] ? this.type_long_test : (java.lang.Long) defaultValue(fields()[3]);
+        record.type_double_test = fieldSetFlags()[4] ? this.type_double_test : (java.lang.Object) defaultValue(fields()[4]);
+        record.type_null_test = fieldSetFlags()[5] ? this.type_null_test : (java.lang.Object) defaultValue(fields()[5]);
+        record.type_bool_test = fieldSetFlags()[6] ? this.type_bool_test : (java.lang.Object) defaultValue(fields()[6]);
+        record.type_array_string = fieldSetFlags()[7] ? this.type_array_string : (java.util.List<java.lang.CharSequence>) defaultValue(fields()[7]);
+        record.type_array_boolean = fieldSetFlags()[8] ? this.type_array_boolean : (java.util.List<java.lang.Boolean>) defaultValue(fields()[8]);
+        record.type_nullable_array = fieldSetFlags()[9] ? this.type_nullable_array : (java.util.List<java.lang.CharSequence>) defaultValue(fields()[9]);
+        record.type_enum = fieldSetFlags()[10] ? this.type_enum : (org.apache.flink.api.io.avro.generated.Colors) defaultValue(fields()[10]);
+        record.type_map = fieldSetFlags()[11] ? this.type_map : (java.util.Map<java.lang.CharSequence,java.lang.Long>) defaultValue(fields()[11]);
+        return record;
+      } catch (Exception e) {
+        throw new org.apache.avro.AvroRuntimeException(e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormatTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormatTest.java
deleted file mode 100644
index 8e0462f..0000000
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormatTest.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io.avro;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-
-import org.junit.Assert;
-
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.flink.api.java.record.io.avro.AvroRecordInputFormat.BooleanListValue;
-import org.apache.flink.api.java.record.io.avro.AvroRecordInputFormat.LongMapValue;
-import org.apache.flink.api.java.record.io.avro.AvroRecordInputFormat.StringListValue;
-import org.apache.flink.api.java.record.io.avro.generated.Colors;
-import org.apache.flink.api.java.record.io.avro.generated.User;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-
-/**
- * Test the avro input format.
- * (The testcase is mostly the getting started tutorial of avro)
- * http://avro.apache.org/docs/current/gettingstartedjava.html
- */
-public class AvroRecordInputFormatTest {
-	
-	private File testFile;
-	
-	private final AvroRecordInputFormat format = new AvroRecordInputFormat();
-	final static String TEST_NAME = "Alyssa";
-	
-	final static String TEST_ARRAY_STRING_1 = "ELEMENT 1";
-	final static String TEST_ARRAY_STRING_2 = "ELEMENT 2";
-	
-	final static boolean TEST_ARRAY_BOOLEAN_1 = true;
-	final static boolean TEST_ARRAY_BOOLEAN_2 = false;
-	
-	final static Colors TEST_ENUM_COLOR = Colors.GREEN;
-	
-	final static CharSequence TEST_MAP_KEY1 = "KEY 1";
-	final static long TEST_MAP_VALUE1 = 8546456L;
-	final static CharSequence TEST_MAP_KEY2 = "KEY 2";
-	final static long TEST_MAP_VALUE2 = 17554L;
-
-	@Before
-	public void createFiles() throws IOException {
-		testFile = File.createTempFile("AvroInputFormatTest", null);
-		
-		ArrayList<CharSequence> stringArray = new ArrayList<CharSequence>();
-		stringArray.add(TEST_ARRAY_STRING_1);
-		stringArray.add(TEST_ARRAY_STRING_2);
-		
-		ArrayList<Boolean> booleanArray = new ArrayList<Boolean>();
-		booleanArray.add(TEST_ARRAY_BOOLEAN_1);
-		booleanArray.add(TEST_ARRAY_BOOLEAN_2);
-		
-		HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, Long>();
-		longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1);
-		longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2);
-		
-		
-		User user1 = new User();
-		user1.setName(TEST_NAME);
-		user1.setFavoriteNumber(256);
-		user1.setTypeDoubleTest(123.45d);
-		user1.setTypeBoolTest(true);
-		user1.setTypeArrayString(stringArray);
-		user1.setTypeArrayBoolean(booleanArray);
-		user1.setTypeEnum(TEST_ENUM_COLOR);
-		user1.setTypeMap(longMap);
-	     
-		// Construct via builder
-		User user2 = User.newBuilder()
-		             .setName("Charlie")
-		             .setFavoriteColor("blue")
-		             .setFavoriteNumber(null)
-		             .setTypeBoolTest(false)
-		             .setTypeDoubleTest(1.337d)
-		             .setTypeNullTest(null)
-		             .setTypeLongTest(1337L)
-		             .setTypeArrayString(new ArrayList<CharSequence>())
-		             .setTypeArrayBoolean(new ArrayList<Boolean>())
-		             .setTypeNullableArray(null)
-		             .setTypeEnum(Colors.RED)
-		             .setTypeMap(new HashMap<CharSequence, Long>())
-		             .build();
-		DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
-		DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
-		dataFileWriter.create(user1.getSchema(), testFile);
-		dataFileWriter.append(user1);
-		dataFileWriter.append(user2);
-		dataFileWriter.close();
-	}
-	
-	@Test
-	public void testDeserialisation() throws IOException {
-		Configuration parameters = new Configuration();
-		format.setFilePath(testFile.toURI().toString());
-		format.configure(parameters);
-		FileInputSplit[] splits = format.createInputSplits(1);
-		Assert.assertEquals(splits.length, 1);
-		format.open(splits[0]);
-		Record record = new Record();
-		Assert.assertNotNull(format.nextRecord(record));
-		StringValue name = record.getField(0, StringValue.class);
-		Assert.assertNotNull("empty record", name);
-		Assert.assertEquals("name not equal",name.getValue(), TEST_NAME);
-		
-		// check arrays
-		StringListValue sl = record.getField(7, AvroRecordInputFormat.StringListValue.class);
-		Assert.assertEquals("element 0 not equal", sl.get(0).getValue(), TEST_ARRAY_STRING_1);
-		Assert.assertEquals("element 1 not equal", sl.get(1).getValue(), TEST_ARRAY_STRING_2);
-		
-		BooleanListValue bl = record.getField(8, AvroRecordInputFormat.BooleanListValue.class);
-		Assert.assertEquals("element 0 not equal", bl.get(0).getValue(), TEST_ARRAY_BOOLEAN_1);
-		Assert.assertEquals("element 1 not equal", bl.get(1).getValue(), TEST_ARRAY_BOOLEAN_2);
-		
-		// check enums
-		StringValue enumValue = record.getField(10, StringValue.class);
-		Assert.assertEquals("string representation of enum not equal", enumValue.getValue(), TEST_ENUM_COLOR.toString()); 
-		
-		// check maps
-		LongMapValue lm = record.getField(11, AvroRecordInputFormat.LongMapValue.class);
-		Assert.assertEquals("map value of key 1 not equal", lm.get(new StringValue(TEST_MAP_KEY1)).getValue(), TEST_MAP_VALUE1);
-		Assert.assertEquals("map value of key 2 not equal", lm.get(new StringValue(TEST_MAP_KEY2)).getValue(), TEST_MAP_VALUE2);
-		
-		
-		Assert.assertFalse("expecting second element", format.reachedEnd());
-		Assert.assertNotNull("expecting second element", format.nextRecord(record));
-		
-		Assert.assertNull(format.nextRecord(record));
-		Assert.assertTrue(format.reachedEnd());
-		
-		format.close();
-	}
-	
-	@After
-	public void deleteFiles() {
-		testFile.delete();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/Colors.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/Colors.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/Colors.java
deleted file mode 100644
index e5ddd20..0000000
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/Colors.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/**
- * Autogenerated by Avro
- * 
- * DO NOT EDIT DIRECTLY
- */
-package org.apache.flink.api.java.record.io.avro.generated;  
-@SuppressWarnings("all")
-@org.apache.avro.specific.AvroGenerated
-public enum Colors { 
-  RED, GREEN, BLUE  ;
-  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"enum\",\"name\":\"Colors\",\"namespace\":\"org.apache.flink.api.java.record.io.avro.generated\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}");
-  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/User.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/User.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/User.java
deleted file mode 100644
index 86a486a..0000000
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/User.java
+++ /dev/null
@@ -1,755 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/**
- * Autogenerated by Avro
- * 
- * DO NOT EDIT DIRECTLY
- */
-package org.apache.flink.api.java.record.io.avro.generated;  
-@SuppressWarnings("all")
-@org.apache.avro.specific.AvroGenerated
-public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
-  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"org.apache.flink.api.java.record.io.avro.generated\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]},{\"name\":\"type_long_test\",\"type\":[\"long\",\"null\"]},{\"name\":\"type_double_test\",\"type\":[\"double\"]},{\"name\":\"type_null_test\",\"type\":[\"null\"]},{\"name\":\"type_bool_test\",\"type\":[\"boolean\"]},{\"name\":\"type_array_string\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"type_array_boolean\",\"type\":{\"type\":\"array\",\"items\":\"boolean\"}},{\"name\":\"type_nullable_array\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"default\":null},{\"name\":\"type_enum\",\"type\":{\"type\":\"enum\",\"name\":\"Colors\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}},{\"n
 ame\":\"type_map\",\"type\":{\"type\":\"map\",\"values\":\"long\"}}]}");
-  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
-  @Deprecated public java.lang.CharSequence name;
-  @Deprecated public java.lang.Integer favorite_number;
-  @Deprecated public java.lang.CharSequence favorite_color;
-  @Deprecated public java.lang.Long type_long_test;
-  @Deprecated public java.lang.Object type_double_test;
-  @Deprecated public java.lang.Object type_null_test;
-  @Deprecated public java.lang.Object type_bool_test;
-  @Deprecated public java.util.List<java.lang.CharSequence> type_array_string;
-  @Deprecated public java.util.List<java.lang.Boolean> type_array_boolean;
-  @Deprecated public java.util.List<java.lang.CharSequence> type_nullable_array;
-  @Deprecated public org.apache.flink.api.java.record.io.avro.generated.Colors type_enum;
-  @Deprecated public java.util.Map<java.lang.CharSequence,java.lang.Long> type_map;
-
-  /**
-   * Default constructor.  Note that this does not initialize fields
-   * to their default values from the schema.  If that is desired then
-   * one should use {@link \#newBuilder()}. 
-   */
-  public User() {}
-
-  /**
-   * All-args constructor.
-   */
-  public User(java.lang.CharSequence name, java.lang.Integer favorite_number, java.lang.CharSequence favorite_color, java.lang.Long type_long_test, java.lang.Object type_double_test, java.lang.Object type_null_test, java.lang.Object type_bool_test, java.util.List<java.lang.CharSequence> type_array_string, java.util.List<java.lang.Boolean> type_array_boolean, java.util.List<java.lang.CharSequence> type_nullable_array, org.apache.flink.api.java.record.io.avro.generated.Colors type_enum, java.util.Map<java.lang.CharSequence,java.lang.Long> type_map) {
-    this.name = name;
-    this.favorite_number = favorite_number;
-    this.favorite_color = favorite_color;
-    this.type_long_test = type_long_test;
-    this.type_double_test = type_double_test;
-    this.type_null_test = type_null_test;
-    this.type_bool_test = type_bool_test;
-    this.type_array_string = type_array_string;
-    this.type_array_boolean = type_array_boolean;
-    this.type_nullable_array = type_nullable_array;
-    this.type_enum = type_enum;
-    this.type_map = type_map;
-  }
-
-  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
-  // Used by DatumWriter.  Applications should not call. 
-  public java.lang.Object get(int field$) {
-    switch (field$) {
-    case 0: return name;
-    case 1: return favorite_number;
-    case 2: return favorite_color;
-    case 3: return type_long_test;
-    case 4: return type_double_test;
-    case 5: return type_null_test;
-    case 6: return type_bool_test;
-    case 7: return type_array_string;
-    case 8: return type_array_boolean;
-    case 9: return type_nullable_array;
-    case 10: return type_enum;
-    case 11: return type_map;
-    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
-    }
-  }
-  // Used by DatumReader.  Applications should not call. 
-  @SuppressWarnings(value="unchecked")
-  public void put(int field$, java.lang.Object value$) {
-    switch (field$) {
-    case 0: name = (java.lang.CharSequence)value$; break;
-    case 1: favorite_number = (java.lang.Integer)value$; break;
-    case 2: favorite_color = (java.lang.CharSequence)value$; break;
-    case 3: type_long_test = (java.lang.Long)value$; break;
-    case 4: type_double_test = (java.lang.Object)value$; break;
-    case 5: type_null_test = (java.lang.Object)value$; break;
-    case 6: type_bool_test = (java.lang.Object)value$; break;
-    case 7: type_array_string = (java.util.List<java.lang.CharSequence>)value$; break;
-    case 8: type_array_boolean = (java.util.List<java.lang.Boolean>)value$; break;
-    case 9: type_nullable_array = (java.util.List<java.lang.CharSequence>)value$; break;
-    case 10: type_enum = (org.apache.flink.api.java.record.io.avro.generated.Colors)value$; break;
-    case 11: type_map = (java.util.Map<java.lang.CharSequence,java.lang.Long>)value$; break;
-    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
-    }
-  }
-
-  /**
-   * Gets the value of the 'name' field.
-   */
-  public java.lang.CharSequence getName() {
-    return name;
-  }
-
-  /**
-   * Sets the value of the 'name' field.
-   * @param value the value to set.
-   */
-  public void setName(java.lang.CharSequence value) {
-    this.name = value;
-  }
-
-  /**
-   * Gets the value of the 'favorite_number' field.
-   */
-  public java.lang.Integer getFavoriteNumber() {
-    return favorite_number;
-  }
-
-  /**
-   * Sets the value of the 'favorite_number' field.
-   * @param value the value to set.
-   */
-  public void setFavoriteNumber(java.lang.Integer value) {
-    this.favorite_number = value;
-  }
-
-  /**
-   * Gets the value of the 'favorite_color' field.
-   */
-  public java.lang.CharSequence getFavoriteColor() {
-    return favorite_color;
-  }
-
-  /**
-   * Sets the value of the 'favorite_color' field.
-   * @param value the value to set.
-   */
-  public void setFavoriteColor(java.lang.CharSequence value) {
-    this.favorite_color = value;
-  }
-
-  /**
-   * Gets the value of the 'type_long_test' field.
-   */
-  public java.lang.Long getTypeLongTest() {
-    return type_long_test;
-  }
-
-  /**
-   * Sets the value of the 'type_long_test' field.
-   * @param value the value to set.
-   */
-  public void setTypeLongTest(java.lang.Long value) {
-    this.type_long_test = value;
-  }
-
-  /**
-   * Gets the value of the 'type_double_test' field.
-   */
-  public java.lang.Object getTypeDoubleTest() {
-    return type_double_test;
-  }
-
-  /**
-   * Sets the value of the 'type_double_test' field.
-   * @param value the value to set.
-   */
-  public void setTypeDoubleTest(java.lang.Object value) {
-    this.type_double_test = value;
-  }
-
-  /**
-   * Gets the value of the 'type_null_test' field.
-   */
-  public java.lang.Object getTypeNullTest() {
-    return type_null_test;
-  }
-
-  /**
-   * Sets the value of the 'type_null_test' field.
-   * @param value the value to set.
-   */
-  public void setTypeNullTest(java.lang.Object value) {
-    this.type_null_test = value;
-  }
-
-  /**
-   * Gets the value of the 'type_bool_test' field.
-   */
-  public java.lang.Object getTypeBoolTest() {
-    return type_bool_test;
-  }
-
-  /**
-   * Sets the value of the 'type_bool_test' field.
-   * @param value the value to set.
-   */
-  public void setTypeBoolTest(java.lang.Object value) {
-    this.type_bool_test = value;
-  }
-
-  /**
-   * Gets the value of the 'type_array_string' field.
-   */
-  public java.util.List<java.lang.CharSequence> getTypeArrayString() {
-    return type_array_string;
-  }
-
-  /**
-   * Sets the value of the 'type_array_string' field.
-   * @param value the value to set.
-   */
-  public void setTypeArrayString(java.util.List<java.lang.CharSequence> value) {
-    this.type_array_string = value;
-  }
-
-  /**
-   * Gets the value of the 'type_array_boolean' field.
-   */
-  public java.util.List<java.lang.Boolean> getTypeArrayBoolean() {
-    return type_array_boolean;
-  }
-
-  /**
-   * Sets the value of the 'type_array_boolean' field.
-   * @param value the value to set.
-   */
-  public void setTypeArrayBoolean(java.util.List<java.lang.Boolean> value) {
-    this.type_array_boolean = value;
-  }
-
-  /**
-   * Gets the value of the 'type_nullable_array' field.
-   */
-  public java.util.List<java.lang.CharSequence> getTypeNullableArray() {
-    return type_nullable_array;
-  }
-
-  /**
-   * Sets the value of the 'type_nullable_array' field.
-   * @param value the value to set.
-   */
-  public void setTypeNullableArray(java.util.List<java.lang.CharSequence> value) {
-    this.type_nullable_array = value;
-  }
-
-  /**
-   * Gets the value of the 'type_enum' field.
-   */
-  public org.apache.flink.api.java.record.io.avro.generated.Colors getTypeEnum() {
-    return type_enum;
-  }
-
-  /**
-   * Sets the value of the 'type_enum' field.
-   * @param value the value to set.
-   */
-  public void setTypeEnum(org.apache.flink.api.java.record.io.avro.generated.Colors value) {
-    this.type_enum = value;
-  }
-
-  /**
-   * Gets the value of the 'type_map' field.
-   */
-  public java.util.Map<java.lang.CharSequence,java.lang.Long> getTypeMap() {
-    return type_map;
-  }
-
-  /**
-   * Sets the value of the 'type_map' field.
-   * @param value the value to set.
-   */
-  public void setTypeMap(java.util.Map<java.lang.CharSequence,java.lang.Long> value) {
-    this.type_map = value;
-  }
-
-  /** Creates a new User RecordBuilder */
-  public static org.apache.flink.api.java.record.io.avro.generated.User.Builder newBuilder() {
-    return new org.apache.flink.api.java.record.io.avro.generated.User.Builder();
-  }
-  
-  /** Creates a new User RecordBuilder by copying an existing Builder */
-  public static org.apache.flink.api.java.record.io.avro.generated.User.Builder newBuilder(org.apache.flink.api.java.record.io.avro.generated.User.Builder other) {
-    return new org.apache.flink.api.java.record.io.avro.generated.User.Builder(other);
-  }
-  
-  /** Creates a new User RecordBuilder by copying an existing User instance */
-  public static org.apache.flink.api.java.record.io.avro.generated.User.Builder newBuilder(org.apache.flink.api.java.record.io.avro.generated.User other) {
-    return new org.apache.flink.api.java.record.io.avro.generated.User.Builder(other);
-  }
-  
-  /**
-   * RecordBuilder for User instances.
-   */
-  public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<User>
-    implements org.apache.avro.data.RecordBuilder<User> {
-
-    private java.lang.CharSequence name;
-    private java.lang.Integer favorite_number;
-    private java.lang.CharSequence favorite_color;
-    private java.lang.Long type_long_test;
-    private java.lang.Object type_double_test;
-    private java.lang.Object type_null_test;
-    private java.lang.Object type_bool_test;
-    private java.util.List<java.lang.CharSequence> type_array_string;
-    private java.util.List<java.lang.Boolean> type_array_boolean;
-    private java.util.List<java.lang.CharSequence> type_nullable_array;
-    private org.apache.flink.api.java.record.io.avro.generated.Colors type_enum;
-    private java.util.Map<java.lang.CharSequence,java.lang.Long> type_map;
-
-    /** Creates a new Builder */
-    private Builder() {
-      super(org.apache.flink.api.java.record.io.avro.generated.User.SCHEMA$);
-    }
-    
-    /** Creates a Builder by copying an existing Builder */
-    private Builder(org.apache.flink.api.java.record.io.avro.generated.User.Builder other) {
-      super(other);
-      if (isValidValue(fields()[0], other.name)) {
-        this.name = data().deepCopy(fields()[0].schema(), other.name);
-        fieldSetFlags()[0] = true;
-      }
-      if (isValidValue(fields()[1], other.favorite_number)) {
-        this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
-        fieldSetFlags()[1] = true;
-      }
-      if (isValidValue(fields()[2], other.favorite_color)) {
-        this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
-        fieldSetFlags()[2] = true;
-      }
-      if (isValidValue(fields()[3], other.type_long_test)) {
-        this.type_long_test = data().deepCopy(fields()[3].schema(), other.type_long_test);
-        fieldSetFlags()[3] = true;
-      }
-      if (isValidValue(fields()[4], other.type_double_test)) {
-        this.type_double_test = data().deepCopy(fields()[4].schema(), other.type_double_test);
-        fieldSetFlags()[4] = true;
-      }
-      if (isValidValue(fields()[5], other.type_null_test)) {
-        this.type_null_test = data().deepCopy(fields()[5].schema(), other.type_null_test);
-        fieldSetFlags()[5] = true;
-      }
-      if (isValidValue(fields()[6], other.type_bool_test)) {
-        this.type_bool_test = data().deepCopy(fields()[6].schema(), other.type_bool_test);
-        fieldSetFlags()[6] = true;
-      }
-      if (isValidValue(fields()[7], other.type_array_string)) {
-        this.type_array_string = data().deepCopy(fields()[7].schema(), other.type_array_string);
-        fieldSetFlags()[7] = true;
-      }
-      if (isValidValue(fields()[8], other.type_array_boolean)) {
-        this.type_array_boolean = data().deepCopy(fields()[8].schema(), other.type_array_boolean);
-        fieldSetFlags()[8] = true;
-      }
-      if (isValidValue(fields()[9], other.type_nullable_array)) {
-        this.type_nullable_array = data().deepCopy(fields()[9].schema(), other.type_nullable_array);
-        fieldSetFlags()[9] = true;
-      }
-      if (isValidValue(fields()[10], other.type_enum)) {
-        this.type_enum = data().deepCopy(fields()[10].schema(), other.type_enum);
-        fieldSetFlags()[10] = true;
-      }
-      if (isValidValue(fields()[11], other.type_map)) {
-        this.type_map = data().deepCopy(fields()[11].schema(), other.type_map);
-        fieldSetFlags()[11] = true;
-      }
-    }
-    
-    /** Creates a Builder by copying an existing User instance */
-    private Builder(org.apache.flink.api.java.record.io.avro.generated.User other) {
-            super(org.apache.flink.api.java.record.io.avro.generated.User.SCHEMA$);
-      if (isValidValue(fields()[0], other.name)) {
-        this.name = data().deepCopy(fields()[0].schema(), other.name);
-        fieldSetFlags()[0] = true;
-      }
-      if (isValidValue(fields()[1], other.favorite_number)) {
-        this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
-        fieldSetFlags()[1] = true;
-      }
-      if (isValidValue(fields()[2], other.favorite_color)) {
-        this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
-        fieldSetFlags()[2] = true;
-      }
-      if (isValidValue(fields()[3], other.type_long_test)) {
-        this.type_long_test = data().deepCopy(fields()[3].schema(), other.type_long_test);
-        fieldSetFlags()[3] = true;
-      }
-      if (isValidValue(fields()[4], other.type_double_test)) {
-        this.type_double_test = data().deepCopy(fields()[4].schema(), other.type_double_test);
-        fieldSetFlags()[4] = true;
-      }
-      if (isValidValue(fields()[5], other.type_null_test)) {
-        this.type_null_test = data().deepCopy(fields()[5].schema(), other.type_null_test);
-        fieldSetFlags()[5] = true;
-      }
-      if (isValidValue(fields()[6], other.type_bool_test)) {
-        this.type_bool_test = data().deepCopy(fields()[6].schema(), other.type_bool_test);
-        fieldSetFlags()[6] = true;
-      }
-      if (isValidValue(fields()[7], other.type_array_string)) {
-        this.type_array_string = data().deepCopy(fields()[7].schema(), other.type_array_string);
-        fieldSetFlags()[7] = true;
-      }
-      if (isValidValue(fields()[8], other.type_array_boolean)) {
-        this.type_array_boolean = data().deepCopy(fields()[8].schema(), other.type_array_boolean);
-        fieldSetFlags()[8] = true;
-      }
-      if (isValidValue(fields()[9], other.type_nullable_array)) {
-        this.type_nullable_array = data().deepCopy(fields()[9].schema(), other.type_nullable_array);
-        fieldSetFlags()[9] = true;
-      }
-      if (isValidValue(fields()[10], other.type_enum)) {
-        this.type_enum = data().deepCopy(fields()[10].schema(), other.type_enum);
-        fieldSetFlags()[10] = true;
-      }
-      if (isValidValue(fields()[11], other.type_map)) {
-        this.type_map = data().deepCopy(fields()[11].schema(), other.type_map);
-        fieldSetFlags()[11] = true;
-      }
-    }
-
-    /** Gets the value of the 'name' field */
-    public java.lang.CharSequence getName() {
-      return name;
-    }
-    
-    /** Sets the value of the 'name' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder setName(java.lang.CharSequence value) {
-      validate(fields()[0], value);
-      this.name = value;
-      fieldSetFlags()[0] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'name' field has been set */
-    public boolean hasName() {
-      return fieldSetFlags()[0];
-    }
-    
-    /** Clears the value of the 'name' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearName() {
-      name = null;
-      fieldSetFlags()[0] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'favorite_number' field */
-    public java.lang.Integer getFavoriteNumber() {
-      return favorite_number;
-    }
-    
-    /** Sets the value of the 'favorite_number' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder setFavoriteNumber(java.lang.Integer value) {
-      validate(fields()[1], value);
-      this.favorite_number = value;
-      fieldSetFlags()[1] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'favorite_number' field has been set */
-    public boolean hasFavoriteNumber() {
-      return fieldSetFlags()[1];
-    }
-    
-    /** Clears the value of the 'favorite_number' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearFavoriteNumber() {
-      favorite_number = null;
-      fieldSetFlags()[1] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'favorite_color' field */
-    public java.lang.CharSequence getFavoriteColor() {
-      return favorite_color;
-    }
-    
-    /** Sets the value of the 'favorite_color' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder setFavoriteColor(java.lang.CharSequence value) {
-      validate(fields()[2], value);
-      this.favorite_color = value;
-      fieldSetFlags()[2] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'favorite_color' field has been set */
-    public boolean hasFavoriteColor() {
-      return fieldSetFlags()[2];
-    }
-    
-    /** Clears the value of the 'favorite_color' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearFavoriteColor() {
-      favorite_color = null;
-      fieldSetFlags()[2] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'type_long_test' field */
-    public java.lang.Long getTypeLongTest() {
-      return type_long_test;
-    }
-    
-    /** Sets the value of the 'type_long_test' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeLongTest(java.lang.Long value) {
-      validate(fields()[3], value);
-      this.type_long_test = value;
-      fieldSetFlags()[3] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'type_long_test' field has been set */
-    public boolean hasTypeLongTest() {
-      return fieldSetFlags()[3];
-    }
-    
-    /** Clears the value of the 'type_long_test' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeLongTest() {
-      type_long_test = null;
-      fieldSetFlags()[3] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'type_double_test' field */
-    public java.lang.Object getTypeDoubleTest() {
-      return type_double_test;
-    }
-    
-    /** Sets the value of the 'type_double_test' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeDoubleTest(java.lang.Object value) {
-      validate(fields()[4], value);
-      this.type_double_test = value;
-      fieldSetFlags()[4] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'type_double_test' field has been set */
-    public boolean hasTypeDoubleTest() {
-      return fieldSetFlags()[4];
-    }
-    
-    /** Clears the value of the 'type_double_test' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeDoubleTest() {
-      type_double_test = null;
-      fieldSetFlags()[4] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'type_null_test' field */
-    public java.lang.Object getTypeNullTest() {
-      return type_null_test;
-    }
-    
-    /** Sets the value of the 'type_null_test' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeNullTest(java.lang.Object value) {
-      validate(fields()[5], value);
-      this.type_null_test = value;
-      fieldSetFlags()[5] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'type_null_test' field has been set */
-    public boolean hasTypeNullTest() {
-      return fieldSetFlags()[5];
-    }
-    
-    /** Clears the value of the 'type_null_test' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeNullTest() {
-      type_null_test = null;
-      fieldSetFlags()[5] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'type_bool_test' field */
-    public java.lang.Object getTypeBoolTest() {
-      return type_bool_test;
-    }
-    
-    /** Sets the value of the 'type_bool_test' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeBoolTest(java.lang.Object value) {
-      validate(fields()[6], value);
-      this.type_bool_test = value;
-      fieldSetFlags()[6] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'type_bool_test' field has been set */
-    public boolean hasTypeBoolTest() {
-      return fieldSetFlags()[6];
-    }
-    
-    /** Clears the value of the 'type_bool_test' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeBoolTest() {
-      type_bool_test = null;
-      fieldSetFlags()[6] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'type_array_string' field */
-    public java.util.List<java.lang.CharSequence> getTypeArrayString() {
-      return type_array_string;
-    }
-    
-    /** Sets the value of the 'type_array_string' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeArrayString(java.util.List<java.lang.CharSequence> value) {
-      validate(fields()[7], value);
-      this.type_array_string = value;
-      fieldSetFlags()[7] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'type_array_string' field has been set */
-    public boolean hasTypeArrayString() {
-      return fieldSetFlags()[7];
-    }
-    
-    /** Clears the value of the 'type_array_string' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeArrayString() {
-      type_array_string = null;
-      fieldSetFlags()[7] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'type_array_boolean' field */
-    public java.util.List<java.lang.Boolean> getTypeArrayBoolean() {
-      return type_array_boolean;
-    }
-    
-    /** Sets the value of the 'type_array_boolean' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeArrayBoolean(java.util.List<java.lang.Boolean> value) {
-      validate(fields()[8], value);
-      this.type_array_boolean = value;
-      fieldSetFlags()[8] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'type_array_boolean' field has been set */
-    public boolean hasTypeArrayBoolean() {
-      return fieldSetFlags()[8];
-    }
-    
-    /** Clears the value of the 'type_array_boolean' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeArrayBoolean() {
-      type_array_boolean = null;
-      fieldSetFlags()[8] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'type_nullable_array' field */
-    public java.util.List<java.lang.CharSequence> getTypeNullableArray() {
-      return type_nullable_array;
-    }
-    
-    /** Sets the value of the 'type_nullable_array' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeNullableArray(java.util.List<java.lang.CharSequence> value) {
-      validate(fields()[9], value);
-      this.type_nullable_array = value;
-      fieldSetFlags()[9] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'type_nullable_array' field has been set */
-    public boolean hasTypeNullableArray() {
-      return fieldSetFlags()[9];
-    }
-    
-    /** Clears the value of the 'type_nullable_array' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeNullableArray() {
-      type_nullable_array = null;
-      fieldSetFlags()[9] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'type_enum' field */
-    public org.apache.flink.api.java.record.io.avro.generated.Colors getTypeEnum() {
-      return type_enum;
-    }
-    
-    /** Sets the value of the 'type_enum' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeEnum(org.apache.flink.api.java.record.io.avro.generated.Colors value) {
-      validate(fields()[10], value);
-      this.type_enum = value;
-      fieldSetFlags()[10] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'type_enum' field has been set */
-    public boolean hasTypeEnum() {
-      return fieldSetFlags()[10];
-    }
-    
-    /** Clears the value of the 'type_enum' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeEnum() {
-      type_enum = null;
-      fieldSetFlags()[10] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'type_map' field */
-    public java.util.Map<java.lang.CharSequence,java.lang.Long> getTypeMap() {
-      return type_map;
-    }
-    
-    /** Sets the value of the 'type_map' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeMap(java.util.Map<java.lang.CharSequence,java.lang.Long> value) {
-      validate(fields()[11], value);
-      this.type_map = value;
-      fieldSetFlags()[11] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'type_map' field has been set */
-    public boolean hasTypeMap() {
-      return fieldSetFlags()[11];
-    }
-    
-    /** Clears the value of the 'type_map' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeMap() {
-      type_map = null;
-      fieldSetFlags()[11] = false;
-      return this;
-    }
-
-    @Override
-    public User build() {
-      try {
-        User record = new User();
-        record.name = fieldSetFlags()[0] ? this.name : (java.lang.CharSequence) defaultValue(fields()[0]);
-        record.favorite_number = fieldSetFlags()[1] ? this.favorite_number : (java.lang.Integer) defaultValue(fields()[1]);
-        record.favorite_color = fieldSetFlags()[2] ? this.favorite_color : (java.lang.CharSequence) defaultValue(fields()[2]);
-        record.type_long_test = fieldSetFlags()[3] ? this.type_long_test : (java.lang.Long) defaultValue(fields()[3]);
-        record.type_double_test = fieldSetFlags()[4] ? this.type_double_test : (java.lang.Object) defaultValue(fields()[4]);
-        record.type_null_test = fieldSetFlags()[5] ? this.type_null_test : (java.lang.Object) defaultValue(fields()[5]);
-        record.type_bool_test = fieldSetFlags()[6] ? this.type_bool_test : (java.lang.Object) defaultValue(fields()[6]);
-        record.type_array_string = fieldSetFlags()[7] ? this.type_array_string : (java.util.List<java.lang.CharSequence>) defaultValue(fields()[7]);
-        record.type_array_boolean = fieldSetFlags()[8] ? this.type_array_boolean : (java.util.List<java.lang.Boolean>) defaultValue(fields()[8]);
-        record.type_nullable_array = fieldSetFlags()[9] ? this.type_nullable_array : (java.util.List<java.lang.CharSequence>) defaultValue(fields()[9]);
-        record.type_enum = fieldSetFlags()[10] ? this.type_enum : (org.apache.flink.api.java.record.io.avro.generated.Colors) defaultValue(fields()[10]);
-        record.type_map = fieldSetFlags()[11] ? this.type_map : (java.util.Map<java.lang.CharSequence,java.lang.Long>) defaultValue(fields()[11]);
-        return record;
-      } catch (Exception e) {
-        throw new org.apache.avro.AvroRuntimeException(e);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java
new file mode 100644
index 0000000..8a89410
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.avro.reflect.Nullable;
+import org.apache.flink.api.common.typeutils.SerializerTestInstance;
+import org.junit.Test;
+
+public class AvroSerializerEmptyArrayTest {
+
+	@Test
+	public void testBookSerialization() {
+		try {
+			Book b = new Book(123, "This is a test book", 26382648);
+			AvroSerializer<Book> serializer = new AvroSerializer<Book>(Book.class);
+			SerializerTestInstance<Book> test = new SerializerTestInstance<Book>(serializer, Book.class, -1, b);
+			test.testAll();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testSerialization() {
+		try {
+			List<String> titles = new ArrayList<String>();
+
+			List<Book> books = new ArrayList<Book>();
+			books.add(new Book(123, "This is a test book", 1));
+			books.add(new Book(24234234, "This is a test book", 1));
+			books.add(new Book(1234324, "This is a test book", 3));
+
+			BookAuthor a = new BookAuthor(1, titles, "Test Author");
+			a.books = books;
+			a.bookType = BookAuthor.BookType.journal;
+			
+			AvroSerializer<BookAuthor> serializer = new AvroSerializer<BookAuthor>(BookAuthor.class);
+			
+			SerializerTestInstance<BookAuthor> test = new SerializerTestInstance<BookAuthor>(serializer, BookAuthor.class, -1, a);
+			test.testAll();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	public static class Book {
+
+		long bookId;
+		@Nullable
+		String title;
+		long authorId;
+
+		public Book() {}
+
+		public Book(long bookId, String title, long authorId) {
+			this.bookId = bookId;
+			this.title = title;
+			this.authorId = authorId;
+		}
+
+		@Override
+		public int hashCode() {
+			final int prime = 31;
+			int result = 1;
+			result = prime * result + (int) (authorId ^ (authorId >>> 32));
+			result = prime * result + (int) (bookId ^ (bookId >>> 32));
+			result = prime * result + ((title == null) ? 0 : title.hashCode());
+			return result;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (this == obj)
+				return true;
+			if (obj == null)
+				return false;
+			if (getClass() != obj.getClass())
+				return false;
+			Book other = (Book) obj;
+			if (authorId != other.authorId)
+				return false;
+			if (bookId != other.bookId)
+				return false;
+			if (title == null) {
+				if (other.title != null)
+					return false;
+			} else if (!title.equals(other.title))
+				return false;
+			return true;
+		}
+	}
+
+	public static class BookAuthor {
+
+		enum BookType {
+			book,
+			article,
+			journal
+		}
+
+		long authorId;
+
+		@Nullable
+		List<String> bookTitles;
+
+		@Nullable
+		List<Book> books;
+
+		String authorName;
+
+		BookType bookType;
+
+		public BookAuthor() {}
+
+		public BookAuthor(long authorId, List<String> bookTitles, String authorName) {
+			this.authorId = authorId;
+			this.bookTitles = bookTitles;
+			this.authorName = authorName;
+		}
+
+		@Override
+		public int hashCode() {
+			final int prime = 31;
+			int result = 1;
+			result = prime * result + (int) (authorId ^ (authorId >>> 32));
+			result = prime * result + ((authorName == null) ? 0 : authorName.hashCode());
+			result = prime * result + ((bookTitles == null) ? 0 : bookTitles.hashCode());
+			result = prime * result + ((bookType == null) ? 0 : bookType.hashCode());
+			result = prime * result + ((books == null) ? 0 : books.hashCode());
+			return result;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (this == obj)
+				return true;
+			if (obj == null)
+				return false;
+			if (getClass() != obj.getClass())
+				return false;
+			BookAuthor other = (BookAuthor) obj;
+			if (authorId != other.authorId)
+				return false;
+			if (authorName == null) {
+				if (other.authorName != null)
+					return false;
+			} else if (!authorName.equals(other.authorName))
+				return false;
+			if (bookTitles == null) {
+				if (other.bookTitles != null)
+					return false;
+			} else if (!bookTitles.equals(other.bookTitles))
+				return false;
+			if (bookType != other.bookType)
+				return false;
+			if (books == null) {
+				if (other.books != null)
+					return false;
+			} else if (!books.equals(other.books))
+				return false;
+			return true;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/tools/maven/suppressions.xml
----------------------------------------------------------------------
diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml
index 0565b5a..377cbfd 100644
--- a/tools/maven/suppressions.xml
+++ b/tools/maven/suppressions.xml
@@ -23,5 +23,5 @@ under the License.
 		"http://www.puppycrawl.com/dtds/suppressions_1_1.dtd">
 
 <suppressions>
-		<suppress files="org[\\/]apache[\\/]flink[\\/]api[\\/]java[\\/]record[\\/]io[\\/]avro[\\/]example[\\/]User.java" checks="[a-zA-Z0-9]*"/>
+		<suppress files="org[\\/]apache[\\/]flink[\\/]api[\\/]io[\\/]avro[\\/]example[\\/]User.java" checks="[a-zA-Z0-9]*"/>
 </suppressions>
\ No newline at end of file


[3/9] git commit: add shading for guava

Posted by se...@apache.org.
add shading for guava


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/7e43b1c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/7e43b1c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/7e43b1c7

Branch: refs/heads/master
Commit: 7e43b1c70429f901b73b0221b3a18b709365f8df
Parents: 30e84ff
Author: Robert Metzger <rm...@apache.org>
Authored: Sun Sep 28 20:54:23 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 30 18:39:58 2014 +0200

----------------------------------------------------------------------
 pom.xml | 27 +++++++++++++++++++++++++++
 1 file changed, 27 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7e43b1c7/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index bd3fd69..99980ea 100644
--- a/pom.xml
+++ b/pom.xml
@@ -424,6 +424,33 @@ under the License.
 	<build>
 		<plugins>
 			<plugin>
+		        <groupId>org.apache.maven.plugins</groupId>
+		        <artifactId>maven-shade-plugin</artifactId>
+		        <version>2.3</version>
+		        <executions>
+		          <execution>
+		            <phase>package</phase>
+		            <goals>
+		              <goal>shade</goal>
+		            </goals>
+					</execution>
+		        </executions>
+		            <configuration>
+		            	<createDependencyReducedPom>false</createDependencyReducedPom>
+			            <artifactSet>
+			                <includes>
+			                    <include>com.google.guava:guava</include>
+			                </includes>
+			            </artifactSet>
+			              <relocations>
+			                <relocation>
+			                    <pattern>com.google</pattern>
+			                    <shadedPattern>org.apache.flink.shaded.com.google</shadedPattern>
+			                </relocation>
+			              </relocations>
+		            </configuration>
+		      </plugin>
+			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-jar-plugin</artifactId>
 				<version>2.4</version><!--$NO-MVN-MAN-VER$-->


[7/9] git commit: RAT plugin not inhertited, to prevent redundant checks.

Posted by se...@apache.org.
RAT plugin not inhertited, to prevent redundant checks.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/6af80fb4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/6af80fb4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/6af80fb4

Branch: refs/heads/master
Commit: 6af80fb4720c74f7af3acbdf45dd7afa7d039717
Parents: e7c4c85
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Sep 29 12:25:30 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 30 18:39:58 2014 +0200

----------------------------------------------------------------------
 pom.xml | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6af80fb4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 624d9b8..bd3fd69 100644
--- a/pom.xml
+++ b/pom.xml
@@ -440,6 +440,7 @@ under the License.
 				<groupId>org.apache.rat</groupId>
 				<artifactId>apache-rat-plugin</artifactId>
 				<version>0.10</version><!--$NO-MVN-MAN-VER$-->
+				<inherited>false</inherited>
 				<executions>
 					<execution>
 						<phase>verify</phase>


[4/9] git commit: [FLINK-1072] improved build matrix

Posted by se...@apache.org.
[FLINK-1072] improved build matrix


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/30e84ff7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/30e84ff7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/30e84ff7

Branch: refs/heads/master
Commit: 30e84ff7542e2915ebf6374ccb34a615723f18a9
Parents: 6af80fb
Author: Robert Metzger <rm...@apache.org>
Authored: Sun Sep 28 20:10:07 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 30 18:39:58 2014 +0200

----------------------------------------------------------------------
 .travis.yml | 23 ++++++++++++++++-------
 1 file changed, 16 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/30e84ff7/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 0b776c1..3021c94 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,10 +1,22 @@
 # s3 deployment based on http://about.travis-ci.org/blog/2012-12-18-travis-artifacts/
 
 language: java
-jdk:
-  - oraclejdk8
-  - oraclejdk7
-  - openjdk6
+
+
+#See https://issues.apache.org/jira/browse/FLINK-1072
+matrix:
+  include:
+    - jdk: "openjdk6"
+      env: PROFILE=
+    - jdk: "oraclejdk7"
+      env: PROFILE=
+    - jdk: "openjdk6"
+      env: PROFILE="-P!include-yarn -Dhadoop.profile=2 -Dhadoop.version=2.0.0-alpha"
+    - jdk: "oraclejdk7"
+      env: PROFILE="-Dhadoop.profile=2 -Dhadoop.version=2.2.0"
+    - jdk: "oraclejdk8"
+      env: PROFILE="-Dhadoop.profile=2 -Dhadoop.version=2.5.0"
+
 
 git:
   depth: 10
@@ -27,9 +39,6 @@ env:
         - secure: "SNZkMm++fvPbjdreibc4j/XTKy7rOvGvjbvJJLQ01fVDR8ur1FGB5L/CE9tm2Aye75G8br+tJ+gf5cMX8CHL+0VrvuTk5U6flbuM08Pd0pmP64ZncmGfRFKC5qTvt24YR0u3cqxWze8RTkdduz0t8xrEdyCtb94CHs1+RNS+0HA="
         # javadocs deploy
         - secure: "f4wACodae0NCaIZtmg/JvP40G/3dE7O6+ONDMbq8P/9dnT5peRB1q7+KZxggRmFSWTMPdPIDI7YxI/rM5wZ3pUN2eX0BpjfANT58OJdgCNXN3Hr9YOa8UykD2h6b+AHpy4G89MG65m73RvNXngHTCQ8SBIfewzeMAhHdD3Fk0u8="
-    matrix:
-        - PROFILE=
-        - PROFILE="-Dhadoop.profile=2"
 
 before_script:
    - "gem install --no-document --version 0.8.9 faraday "


[2/9] git commit: [FLINK-1117] Clean up flink-avro project: remove deprecated AvroRecord format, migrate tests to new java api.

Posted by se...@apache.org.
[FLINK-1117] Clean up flink-avro project: remove deprecated AvroRecord format, migrate tests to new java api.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/626d6b78
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/626d6b78
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/626d6b78

Branch: refs/heads/master
Commit: 626d6b785db649e06951e2f336f5ca411b30dce5
Parents: 38e4755
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 30 14:15:22 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 30 18:39:58 2014 +0200

----------------------------------------------------------------------
 flink-addons/flink-avro/pom.xml                 |   7 +-
 .../apache/flink/api/avro/AvroBaseValue.java    | 153 ----
 .../api/io/avro/example/AvroTypeExample.java    | 111 +++
 .../apache/flink/api/io/avro/example/User.java  | 269 +++++++
 .../java/record/io/avro/AvroInputFormat.java    | 111 ---
 .../record/io/avro/AvroRecordInputFormat.java   | 374 ---------
 .../avro/example/ReflectiveAvroTypeExample.java | 161 ----
 .../api/java/record/io/avro/example/SUser.java  |  25 -
 .../api/java/record/io/avro/example/User.java   | 269 -------
 .../flink/api/avro/AvroOutputFormatTest.java    |   2 +-
 .../api/avro/AvroWithEmptyArrayITCase.java      | 217 ------
 .../flink/api/avro/EncoderDecoderTest.java      |   4 +-
 .../avro/testjar/AvroExternalJarProgram.java    |  13 -
 .../api/io/avro/AvroRecordInputFormatTest.java  | 167 ++++
 .../flink/api/io/avro/generated/Colors.java     |  32 +
 .../flink/api/io/avro/generated/User.java       | 755 +++++++++++++++++++
 .../io/avro/AvroRecordInputFormatTest.java      | 167 ----
 .../java/record/io/avro/generated/Colors.java   |  32 -
 .../api/java/record/io/avro/generated/User.java | 755 -------------------
 .../runtime/AvroSerializerEmptyArrayTest.java   | 189 +++++
 tools/maven/suppressions.xml                    |   2 +-
 21 files changed, 1531 insertions(+), 2284 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/pom.xml b/flink-addons/flink-avro/pom.xml
index 68f722c..ff4decc 100644
--- a/flink-addons/flink-avro/pom.xml
+++ b/flink-addons/flink-avro/pom.xml
@@ -54,10 +54,11 @@ under the License.
 			<!-- version is derived from base module -->
 		</dependency>
 		
+		<!--  guava needs to be in "provided" scope, so to not re-export the dependency -->
 		<dependency>
-			<groupId>org.apache.avro</groupId>
-			<artifactId>avro-mapred</artifactId>
-			<!-- version is derived from base module -->
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<scope>provided</scope>
 		</dependency>
 		
 		<dependency>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/AvroBaseValue.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/AvroBaseValue.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/AvroBaseValue.java
deleted file mode 100644
index 0d64910..0000000
--- a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/AvroBaseValue.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import java.io.IOException;
-
-import org.apache.avro.mapred.AvroValue;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Key;
-import org.apache.flink.util.ReflectionUtil;
-
-
-public abstract class AvroBaseValue<T> extends AvroValue<T> implements Key<AvroBaseValue<T>> {
-	
-	private static final long serialVersionUID = 1L;
-
-
-	public AvroBaseValue() {}
-	
-	public AvroBaseValue(T datum) {
-		super(datum);
-	}
-
-	
-	// --------------------------------------------------------------------------------------------
-	//  Serialization / Deserialization
-	// --------------------------------------------------------------------------------------------
-	
-	private ReflectDatumWriter<T> writer;
-	private ReflectDatumReader<T> reader;
-	
-	private DataOutputEncoder encoder;
-	private DataInputDecoder decoder;
-	
-	
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		// the null flag
-		if (datum() == null) {
-			out.writeBoolean(false);
-		} else {
-			out.writeBoolean(true);
-			
-			DataOutputEncoder encoder = getEncoder();
-			encoder.setOut(out);
-			getWriter().write(datum(), encoder);
-		}
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		// the null flag
-		if (in.readBoolean()) {
-			
-			DataInputDecoder decoder = getDecoder();
-			decoder.setIn(in);
-			datum(getReader().read(datum(), decoder));
-		}
-	}
-	
-	private ReflectDatumWriter<T> getWriter() {
-		if (this.writer == null) {
-			@SuppressWarnings("unchecked")
-			Class<T> clazz = (Class<T>) datum().getClass();
-			this.writer = new ReflectDatumWriter<T>(clazz);
-		}
-		return this.writer;
-	}
-	
-	private ReflectDatumReader<T> getReader() {
-		if (this.reader == null) {
-			Class<T> datumClass = ReflectionUtil.getTemplateType1(getClass());
-			this.reader = new ReflectDatumReader<T>(datumClass);
-		}
-		return this.reader;
-	}
-	
-	private DataOutputEncoder getEncoder() {
-		if (this.encoder == null) {
-			this.encoder = new DataOutputEncoder();
-		}
-		return this.encoder;
-	}
-	
-	private DataInputDecoder getDecoder() {
-		if (this.decoder == null) {
-			this.decoder = new DataInputDecoder();
-		}
-		return this.decoder;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Hashing / Equality
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public int hashCode() {
-		return datum() == null ? 0 : datum().hashCode();
-	}
-	
-	@Override
-	public boolean equals(Object obj) {
-		if (obj.getClass() == this.getClass()) {
-			Object otherDatum = ((AvroBaseValue<?>) obj).datum();
-			Object thisDatum = datum();
-			
-			if (thisDatum == null) {
-				return otherDatum == null;
-			} else {
-				return thisDatum.equals(otherDatum);
-			}
-		} else {
-			return false;
-		}
-	}
-	
-	@Override
-	public String toString() {
-		return "AvroBaseValue (" + datum() + ")";
-	}
-	
-	@SuppressWarnings("unchecked")
-	@Override
-	public int compareTo(AvroBaseValue<T> o) {
-		Object otherDatum = o.datum();
-		Object thisDatum = datum();
-		
-		if (thisDatum == null) {
-			return otherDatum == null ? 0 : -1;
-		} else {
-			return otherDatum == null ? 1: ((Comparable<Object>) thisDatum).compareTo(otherDatum);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java
new file mode 100644
index 0000000..6affeec
--- /dev/null
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.io.avro.example;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.io.GenericInputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+
+@SuppressWarnings("serial")
+public class AvroTypeExample {
+	
+	
+	public static void main(String[] args) throws Exception {
+		
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		DataSet<User> users = env.createInput(new UserGeneratingInputFormat());
+		
+		users
+			.map(new NumberExtractingMapper())
+			.groupBy(1)
+			.reduceGroup(new ConcatenatingReducer())
+			.print();
+		
+		env.execute();
+	}
+	
+	
+	
+	public static final class NumberExtractingMapper implements MapFunction<User, Tuple2<User, Integer>> {
+		
+		@Override
+		public Tuple2<User, Integer> map(User user) {
+			return new Tuple2<User, Integer>(user, user.getFavoriteNumber());
+		}
+	}
+	
+	
+	public static final class ConcatenatingReducer implements GroupReduceFunction<Tuple2<User, Integer>, Tuple2<Integer, String>> {
+
+		@Override
+		public void reduce(Iterable<Tuple2<User, Integer>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
+			int number = 0;
+			StringBuilder colors = new StringBuilder();
+			
+			for (Tuple2<User, Integer> u : values) {
+				number = u.f1;
+				colors.append(u.f0.getFavoriteColor()).append(" - ");
+			}
+			
+			colors.setLength(colors.length() - 3);
+			out.collect(new Tuple2<Integer, String>(number, colors.toString()));
+		}
+	}
+	
+	
+	public static final class UserGeneratingInputFormat extends GenericInputFormat<User> {
+
+		private static final long serialVersionUID = 1L;
+		
+		private static final int NUM = 100;
+		
+		private final Random rnd = new Random(32498562304986L);
+		
+		private static final String[] NAMES = { "Peter", "Bob", "Liddy", "Alexander", "Stan" };
+		
+		private static final String[] COLORS = { "mauve", "crimson", "copper", "sky", "grass" };
+		
+		private int count;
+		
+
+		@Override
+		public boolean reachedEnd() throws IOException {
+			return count >= NUM;
+		}
+
+		@Override
+		public User nextRecord(User reuse) throws IOException {
+			count++;
+			
+			User u = new User();
+			u.setName(NAMES[rnd.nextInt(NAMES.length)]);
+			u.setFavoriteColor(COLORS[rnd.nextInt(COLORS.length)]);
+			u.setFavoriteNumber(rnd.nextInt(87));
+			return u;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/User.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/User.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/User.java
new file mode 100644
index 0000000..3394d60
--- /dev/null
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/User.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * Autogenerated by Avro
+ * 
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.flink.api.io.avro.example;  
+@SuppressWarnings("all")
+@org.apache.avro.specific.AvroGenerated
+public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"org.apache.flink.api.avro.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}");
+  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+  @Deprecated public java.lang.CharSequence name;
+  @Deprecated public java.lang.Integer favorite_number;
+  @Deprecated public java.lang.CharSequence favorite_color;
+
+  /**
+   * Default constructor.  Note that this does not initialize fields
+   * to their default values from the schema.  If that is desired then
+   * one should use {@link \#newBuilder()}. 
+   */
+  public User() {}
+
+  /**
+   * All-args constructor.
+   */
+  public User(java.lang.CharSequence name, java.lang.Integer favorite_number, java.lang.CharSequence favorite_color) {
+    this.name = name;
+    this.favorite_number = favorite_number;
+    this.favorite_color = favorite_color;
+  }
+
+  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+  // Used by DatumWriter.  Applications should not call. 
+  public java.lang.Object get(int field$) {
+    switch (field$) {
+    case 0: return name;
+    case 1: return favorite_number;
+    case 2: return favorite_color;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+  // Used by DatumReader.  Applications should not call. 
+  @SuppressWarnings(value="unchecked")
+  public void put(int field$, java.lang.Object value$) {
+    switch (field$) {
+    case 0: name = (java.lang.CharSequence)value$; break;
+    case 1: favorite_number = (java.lang.Integer)value$; break;
+    case 2: favorite_color = (java.lang.CharSequence)value$; break;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+
+  /**
+   * Gets the value of the 'name' field.
+   */
+  public java.lang.CharSequence getName() {
+    return name;
+  }
+
+  /**
+   * Sets the value of the 'name' field.
+   * @param value the value to set.
+   */
+  public void setName(java.lang.CharSequence value) {
+    this.name = value;
+  }
+
+  /**
+   * Gets the value of the 'favorite_number' field.
+   */
+  public java.lang.Integer getFavoriteNumber() {
+    return favorite_number;
+  }
+
+  /**
+   * Sets the value of the 'favorite_number' field.
+   * @param value the value to set.
+   */
+  public void setFavoriteNumber(java.lang.Integer value) {
+    this.favorite_number = value;
+  }
+
+  /**
+   * Gets the value of the 'favorite_color' field.
+   */
+  public java.lang.CharSequence getFavoriteColor() {
+    return favorite_color;
+  }
+
+  /**
+   * Sets the value of the 'favorite_color' field.
+   * @param value the value to set.
+   */
+  public void setFavoriteColor(java.lang.CharSequence value) {
+    this.favorite_color = value;
+  }
+
+  /** Creates a new User RecordBuilder */
+  public static org.apache.flink.api.io.avro.example.User.Builder newBuilder() {
+    return new org.apache.flink.api.io.avro.example.User.Builder();
+  }
+  
+  /** Creates a new User RecordBuilder by copying an existing Builder */
+  public static org.apache.flink.api.io.avro.example.User.Builder newBuilder(org.apache.flink.api.io.avro.example.User.Builder other) {
+    return new org.apache.flink.api.io.avro.example.User.Builder(other);
+  }
+  
+  /** Creates a new User RecordBuilder by copying an existing User instance */
+  public static org.apache.flink.api.io.avro.example.User.Builder newBuilder(org.apache.flink.api.io.avro.example.User other) {
+    return new org.apache.flink.api.io.avro.example.User.Builder(other);
+  }
+  
+  /**
+   * RecordBuilder for User instances.
+   */
+  public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<User>
+    implements org.apache.avro.data.RecordBuilder<User> {
+
+    private java.lang.CharSequence name;
+    private java.lang.Integer favorite_number;
+    private java.lang.CharSequence favorite_color;
+
+    /** Creates a new Builder */
+    private Builder() {
+      super(org.apache.flink.api.io.avro.example.User.SCHEMA$);
+    }
+    
+    /** Creates a Builder by copying an existing Builder */
+    private Builder(org.apache.flink.api.io.avro.example.User.Builder other) {
+      super(other);
+      if (isValidValue(fields()[0], other.name)) {
+        this.name = data().deepCopy(fields()[0].schema(), other.name);
+        fieldSetFlags()[0] = true;
+      }
+      if (isValidValue(fields()[1], other.favorite_number)) {
+        this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
+        fieldSetFlags()[1] = true;
+      }
+      if (isValidValue(fields()[2], other.favorite_color)) {
+        this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
+        fieldSetFlags()[2] = true;
+      }
+    }
+    
+    /** Creates a Builder by copying an existing User instance */
+    private Builder(org.apache.flink.api.io.avro.example.User other) {
+            super(org.apache.flink.api.io.avro.example.User.SCHEMA$);
+      if (isValidValue(fields()[0], other.name)) {
+        this.name = data().deepCopy(fields()[0].schema(), other.name);
+        fieldSetFlags()[0] = true;
+      }
+      if (isValidValue(fields()[1], other.favorite_number)) {
+        this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
+        fieldSetFlags()[1] = true;
+      }
+      if (isValidValue(fields()[2], other.favorite_color)) {
+        this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
+        fieldSetFlags()[2] = true;
+      }
+    }
+
+    /** Gets the value of the 'name' field */
+    public java.lang.CharSequence getName() {
+      return name;
+    }
+    
+    /** Sets the value of the 'name' field */
+    public org.apache.flink.api.io.avro.example.User.Builder setName(java.lang.CharSequence value) {
+      validate(fields()[0], value);
+      this.name = value;
+      fieldSetFlags()[0] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'name' field has been set */
+    public boolean hasName() {
+      return fieldSetFlags()[0];
+    }
+    
+    /** Clears the value of the 'name' field */
+    public org.apache.flink.api.io.avro.example.User.Builder clearName() {
+      name = null;
+      fieldSetFlags()[0] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'favorite_number' field */
+    public java.lang.Integer getFavoriteNumber() {
+      return favorite_number;
+    }
+    
+    /** Sets the value of the 'favorite_number' field */
+    public org.apache.flink.api.io.avro.example.User.Builder setFavoriteNumber(java.lang.Integer value) {
+      validate(fields()[1], value);
+      this.favorite_number = value;
+      fieldSetFlags()[1] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'favorite_number' field has been set */
+    public boolean hasFavoriteNumber() {
+      return fieldSetFlags()[1];
+    }
+    
+    /** Clears the value of the 'favorite_number' field */
+    public org.apache.flink.api.io.avro.example.User.Builder clearFavoriteNumber() {
+      favorite_number = null;
+      fieldSetFlags()[1] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'favorite_color' field */
+    public java.lang.CharSequence getFavoriteColor() {
+      return favorite_color;
+    }
+    
+    /** Sets the value of the 'favorite_color' field */
+    public org.apache.flink.api.io.avro.example.User.Builder setFavoriteColor(java.lang.CharSequence value) {
+      validate(fields()[2], value);
+      this.favorite_color = value;
+      fieldSetFlags()[2] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'favorite_color' field has been set */
+    public boolean hasFavoriteColor() {
+      return fieldSetFlags()[2];
+    }
+    
+    /** Clears the value of the 'favorite_color' field */
+    public org.apache.flink.api.io.avro.example.User.Builder clearFavoriteColor() {
+      favorite_color = null;
+      fieldSetFlags()[2] = false;
+      return this;
+    }
+
+    @Override
+    public User build() {
+      try {
+        User record = new User();
+        record.name = fieldSetFlags()[0] ? this.name : (java.lang.CharSequence) defaultValue(fields()[0]);
+        record.favorite_number = fieldSetFlags()[1] ? this.favorite_number : (java.lang.Integer) defaultValue(fields()[1]);
+        record.favorite_color = fieldSetFlags()[2] ? this.favorite_color : (java.lang.CharSequence) defaultValue(fields()[2]);
+        return record;
+      } catch (Exception e) {
+        throw new org.apache.avro.AvroRuntimeException(e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java
deleted file mode 100644
index a898f8d..0000000
--- a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io.avro;
-
-import java.io.IOException;
-
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.FileReader;
-import org.apache.avro.file.SeekableInput;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.avro.AvroBaseValue;
-import org.apache.flink.api.avro.FSDataInputStreamWrapper;
-import org.apache.flink.api.java.record.io.FileInputFormat;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.ReflectionUtil;
-
-
-public class AvroInputFormat<E> extends FileInputFormat {
-	
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(AvroInputFormat.class);
-	
-	
-	private final Class<? extends AvroBaseValue<E>> avroWrapperTypeClass;
-	
-	private final Class<E> avroValueType;
-	
-
-	private transient FileReader<E> dataFileReader;
-	
-	private transient E reuseAvroValue;
-	
-	private transient AvroBaseValue<E> wrapper;
-	
-	
-	public AvroInputFormat(Class<? extends AvroBaseValue<E>> wrapperClass) {
-		this.avroWrapperTypeClass = wrapperClass;
-		this.avroValueType = ReflectionUtil.getTemplateType1(wrapperClass);
-		this.unsplittable = true;
-	}
-	
-	public AvroInputFormat(Class<? extends AvroBaseValue<E>> wrapperClass, Class<E> avroType) {
-		this.avroValueType = avroType;
-		this.avroWrapperTypeClass = wrapperClass;
-		this.unsplittable = true;
-	}
-
-	@Override
-	public void open(FileInputSplit split) throws IOException {
-		super.open(split);
-		
-		this.wrapper = InstantiationUtil.instantiate(avroWrapperTypeClass, AvroBaseValue.class);
-		
-		DatumReader<E> datumReader;
-		if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) {
-			datumReader = new SpecificDatumReader<E>(avroValueType);
-		} else {
-			datumReader = new ReflectDatumReader<E>(avroValueType);
-		}
-		
-		LOG.info("Opening split " + split);
-		
-		SeekableInput in = new FSDataInputStreamWrapper(stream, (int) split.getLength());
-		
-		dataFileReader = DataFileReader.openReader(in, datumReader);
-		dataFileReader.sync(split.getStart());
-		
-		reuseAvroValue = null;
-	}
-
-	@Override
-	public boolean reachedEnd() throws IOException {
-		return !dataFileReader.hasNext();
-	}
-
-	@Override
-	public Record nextRecord(Record record) throws IOException {
-		if (!dataFileReader.hasNext()) {
-			return null;
-		}
-		
-		reuseAvroValue = dataFileReader.next(reuseAvroValue);
-		wrapper.datum(reuseAvroValue);
-		record.setField(0, wrapper);
-		return record;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java
deleted file mode 100644
index 1639ec4..0000000
--- a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java
+++ /dev/null
@@ -1,374 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io.avro;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.FileReader;
-import org.apache.avro.file.SeekableInput;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.avro.FSDataInputStreamWrapper;
-import org.apache.flink.api.java.record.io.FileInputFormat;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.BooleanValue;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.ListValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.MapValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.types.Value;
-
-/**
- * Input format to read Avro files.
- * 
- * The input format currently supports only flat avro schemas. So there is no
- * support for complex types except for nullable primitve fields, e.g.
- * ["string", null] (See
- * http://avro.apache.org/docs/current/spec.html#schema_complex)
- * 
- */
-public class AvroRecordInputFormat extends FileInputFormat {
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(AvroRecordInputFormat.class);
-
-	private FileReader<GenericRecord> dataFileReader;
-	private GenericRecord reuseAvroRecord = null;
-
-	@Override
-	public void open(FileInputSplit split) throws IOException {
-		super.open(split);
-		DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
-		SeekableInput in = new FSDataInputStreamWrapper(stream, (int) split.getLength());
-		LOG.info("Opening split " + split);
-		dataFileReader = DataFileReader.openReader(in, datumReader);
-		dataFileReader.sync(split.getStart());
-	}
-
-	@Override
-	public boolean reachedEnd() throws IOException {
-		return !dataFileReader.hasNext();
-	}
-
-	@Override
-	public Record nextRecord(Record record) throws IOException {
-		if (!dataFileReader.hasNext()) {
-			return null;
-		}
-		if (record == null) {
-			throw new IllegalArgumentException("Empty PactRecord given");
-		}
-		reuseAvroRecord = dataFileReader.next(reuseAvroRecord);
-		final List<Field> fields = reuseAvroRecord.getSchema().getFields();
-		for (Field field : fields) {
-			final Value value = convertAvroToPactValue(field, reuseAvroRecord.get(field.pos()));
-			record.setField(field.pos(), value);
-			record.updateBinaryRepresenation();
-		}
-
-		return record;
-	}
-
-
-	@SuppressWarnings("unchecked")
-	private final Value convertAvroToPactValue(final Field field, final Object avroRecord) {
-		if (avroRecord == null) {
-			return null;
-		}
-		final Type type = checkTypeConstraintsAndGetType(field.schema());
-
-		// check for complex types
-		// (complex type FIXED is not yet supported)
-		switch (type) {
-			case ARRAY:
-				final Type elementType = field.schema().getElementType().getType();
-				final List<?> avroList = (List<?>) avroRecord;
-				return convertAvroArrayToListValue(elementType, avroList);
-			case ENUM:
-				final List<String> symbols = field.schema().getEnumSymbols();
-				final String avroRecordString = avroRecord.toString();
-				if (!symbols.contains(avroRecordString)) {
-					throw new RuntimeException("The given Avro file contains field with a invalid enum symbol");
-				}
-				sString.setValue(avroRecordString);
-				return sString;
-			case MAP:
-				final Type valueType = field.schema().getValueType().getType();
-				final Map<CharSequence, ?> avroMap = (Map<CharSequence, ?>) avroRecord;
-				return convertAvroMapToMapValue(valueType, avroMap);
-	
-			// primitive type
-			default:
-				return convertAvroPrimitiveToValue(type, avroRecord);
-
-		}
-	}
-
-	private final ListValue<?> convertAvroArrayToListValue(Type elementType, List<?> avroList) {
-		switch (elementType) {
-		case STRING:
-			StringListValue sl = new StringListValue();
-			for (Object item : avroList) {
-				sl.add(new StringValue((CharSequence) item));
-			}
-			return sl;
-		case INT:
-			IntListValue il = new IntListValue();
-			for (Object item : avroList) {
-				il.add(new IntValue((Integer) item));
-			}
-			return il;
-		case BOOLEAN:
-			BooleanListValue bl = new BooleanListValue();
-			for (Object item : avroList) {
-				bl.add(new BooleanValue((Boolean) item));
-			}
-			return bl;
-		case DOUBLE:
-			DoubleListValue dl = new DoubleListValue();
-			for (Object item : avroList) {
-				dl.add(new DoubleValue((Double) item));
-			}
-			return dl;
-		case FLOAT:
-			FloatListValue fl = new FloatListValue();
-			for (Object item : avroList) {
-				fl.add(new FloatValue((Float) item));
-			}
-			return fl;
-		case LONG:
-			LongListValue ll = new LongListValue();
-			for (Object item : avroList) {
-				ll.add(new LongValue((Long) item));
-			}
-			return ll;
-		default:
-			throw new RuntimeException("Elements of type " + elementType + " are not supported for Avro arrays.");
-		}
-	}
-
-	private final MapValue<StringValue, ?> convertAvroMapToMapValue(Type mapValueType, Map<CharSequence, ?> avroMap) {
-		switch (mapValueType) {
-		case STRING:
-			StringMapValue sm = new StringMapValue();
-			for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
-				sm.put(new StringValue((CharSequence) entry.getKey()), new StringValue((String) entry.getValue()));
-			}
-			return sm;
-		case INT:
-			IntMapValue im = new IntMapValue();
-			for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
-				im.put(new StringValue((CharSequence) entry.getKey()), new IntValue((Integer) entry.getValue()));
-			}
-			return im;
-		case BOOLEAN:
-			BooleanMapValue bm = new BooleanMapValue();
-			for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
-				bm.put(new StringValue((CharSequence) entry.getKey()), new BooleanValue((Boolean) entry.getValue()));
-			}
-			return bm;
-		case DOUBLE:
-			DoubleMapValue dm = new DoubleMapValue();
-			for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
-				dm.put(new StringValue((CharSequence) entry.getKey()), new DoubleValue((Double) entry.getValue()));
-			}
-			return dm;
-		case FLOAT:
-			FloatMapValue fm = new FloatMapValue();
-			for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
-				fm.put(new StringValue((CharSequence) entry.getKey()), new FloatValue((Float) entry.getValue()));
-			}
-			return fm;
-		case LONG:
-			LongMapValue lm = new LongMapValue();
-			for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
-				lm.put(new StringValue((CharSequence) entry.getKey()), new LongValue((Long) entry.getValue()));
-			}
-			return lm;
-
-		default:
-			throw new RuntimeException("Map values of type " + mapValueType + " are not supported for Avro map.");
-		}
-	}
-
-	private StringValue sString = new StringValue();
-	private IntValue sInt = new IntValue();
-	private BooleanValue sBool = new BooleanValue();
-	private DoubleValue sDouble = new DoubleValue();
-	private FloatValue sFloat = new FloatValue();
-	private LongValue sLong = new LongValue();
-	
-	private final Value convertAvroPrimitiveToValue(Type type, Object avroRecord) {
-		switch (type) {
-		case STRING:
-			sString.setValue((CharSequence) avroRecord);
-			return sString;
-		case INT:
-			sInt.setValue((Integer) avroRecord);
-			return sInt;
-		case BOOLEAN:
-			sBool.setValue((Boolean) avroRecord);
-			return sBool;
-		case DOUBLE:
-			sDouble.setValue((Double) avroRecord);
-			return sDouble;
-		case FLOAT:
-			sFloat.setValue((Float) avroRecord);
-			return sFloat;
-		case LONG:
-			sLong.setValue((Long) avroRecord);
-			return sLong;
-		case NULL:
-			return NullValue.getInstance();
-		default:
-			throw new RuntimeException(
-					"Type "
-							+ type
-							+ " for AvroInputFormat is not implemented. Open an issue on GitHub.");
-		}
-	}
-
-	private final Type checkTypeConstraintsAndGetType(final Schema schema) {
-		final Type type = schema.getType();
-		if (type == Type.RECORD) {
-			throw new RuntimeException("The given Avro file contains complex data types which are not supported right now");
-		}
-
-		if (type == Type.UNION) {
-			List<Schema> types = schema.getTypes();
-			if (types.size() > 2) {
-				throw new RuntimeException("The given Avro file contains a union that has more than two elements");
-			}
-			if (types.size() == 1 && types.get(0).getType() != Type.UNION) {
-				return types.get(0).getType();
-			}
-			if (types.get(0).getType() == Type.UNION || types.get(1).getType() == Type.UNION) {
-				throw new RuntimeException("The given Avro file contains a nested union");
-			}
-			if (types.get(0).getType() == Type.NULL) {
-				return types.get(1).getType();
-			} else {
-				if (types.get(1).getType() != Type.NULL) {
-					throw new RuntimeException("The given Avro file is contains a union with two non-null types.");
-				}
-				return types.get(0).getType();
-			}
-		}
-		return type;
-	}
-
-	/**
-	 * Set minNumSplits to number of files.
-	 */
-	@Override
-	public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException {
-		int numAvroFiles = 0;
-		final Path path = this.filePath;
-		// get all the files that are involved in the splits
-		final FileSystem fs = path.getFileSystem();
-		final FileStatus pathFile = fs.getFileStatus(path);
-
-		if (!acceptFile(pathFile)) {
-			throw new IOException("The given file does not pass the file-filter");
-		}
-		if (pathFile.isDir()) {
-			// input is directory. list all contained files
-			final FileStatus[] dir = fs.listStatus(path);
-			for (int i = 0; i < dir.length; i++) {
-				if (!dir[i].isDir() && acceptFile(dir[i])) {
-					numAvroFiles++;
-				}
-			}
-		} else {
-			numAvroFiles = 1;
-		}
-		return super.createInputSplits(numAvroFiles);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// Concrete subclasses of ListValue and MapValue for all possible primitive types
-	// --------------------------------------------------------------------------------------------
-
-	public static class StringListValue extends ListValue<StringValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-	public static class IntListValue extends ListValue<IntValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-	public static class BooleanListValue extends ListValue<BooleanValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-	public static class DoubleListValue extends ListValue<DoubleValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-	public static class FloatListValue extends ListValue<FloatValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-	public static class LongListValue extends ListValue<LongValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-	public static class StringMapValue extends MapValue<StringValue, StringValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-	public static class IntMapValue extends MapValue<StringValue, IntValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-	public static class BooleanMapValue extends MapValue<StringValue, BooleanValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-	public static class DoubleMapValue extends MapValue<StringValue, DoubleValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-	public static class FloatMapValue extends MapValue<StringValue, FloatValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-	public static class LongMapValue extends MapValue<StringValue, LongValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/ReflectiveAvroTypeExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/ReflectiveAvroTypeExample.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/ReflectiveAvroTypeExample.java
deleted file mode 100644
index d6d1213..0000000
--- a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/ReflectiveAvroTypeExample.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io.avro.example;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Iterator;
-import java.util.Random;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.io.GenericInputFormat;
-import org.apache.flink.api.java.record.operators.GenericDataSink;
-import org.apache.flink.api.java.record.operators.GenericDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.client.LocalExecutor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-
-
-public class ReflectiveAvroTypeExample {
-	
-	
-	public static void main(String[] args) throws Exception {
-		
-		GenericDataSource<UserGeneratingInputFormat> source = new GenericDataSource<UserGeneratingInputFormat>(UserGeneratingInputFormat.class);
-		
-		MapOperator mapper = MapOperator.builder(new NumberExtractingMapper())
-				.input(source).name("le mapper").build();
-		
-		ReduceOperator reducer = ReduceOperator.builder(new ConcatenatingReducer(), IntValue.class, 1)
-				.input(mapper).name("le reducer").build();
-		
-		GenericDataSink sink = new GenericDataSink(PrintingOutputFormat.class, reducer);
-		
-		Plan p = new Plan(sink);
-		p.setDefaultParallelism(4);
-		
-		LocalExecutor.execute(p);
-	}
-	
-	
-	public static final class NumberExtractingMapper extends MapFunction implements Serializable {
-		private static final long serialVersionUID = 1L;
-		
-		@Override
-		public void map(Record record, Collector<Record> out) throws Exception {
-			User u = record.getField(0, SUser.class).datum();
-			record.setField(1, new IntValue(u.getFavoriteNumber()));
-			out.collect(record);
-		}
-	}
-	
-	
-	public static final class ConcatenatingReducer extends ReduceFunction implements Serializable {
-		private static final long serialVersionUID = 1L;
-		
-		private final Record result = new Record(2);
-
-		@Override
-		public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
-			Record r = records.next();
-			
-			int num = r.getField(1, IntValue.class).getValue();
-			String names = r.getField(0, SUser.class).datum().getFavoriteColor().toString();
-			
-			while (records.hasNext()) {
-				r = records.next();
-				names += " - " + r.getField(0, SUser.class).datum().getFavoriteColor().toString();
-			}
-			
-			result.setField(0, new IntValue(num));
-			result.setField(1,  new StringValue(names));
-			out.collect(result);
-		}
-
-	}
-	
-	
-	public static final class UserGeneratingInputFormat extends GenericInputFormat {
-
-		private static final long serialVersionUID = 1L;
-		
-		private static final int NUM = 100;
-		
-		private final Random rnd = new Random(32498562304986L);
-		
-		private static final String[] NAMES = { "Peter", "Bob", "Liddy", "Alexander", "Stan" };
-		
-		private static final String[] COLORS = { "mauve", "crimson", "copper", "sky", "grass" };
-		
-		private int count;
-		
-
-		@Override
-		public boolean reachedEnd() throws IOException {
-			return count >= NUM;
-		}
-
-		@Override
-		public Record nextRecord(Record record) throws IOException {
-			count++;
-			
-			User u = new User();
-			u.setName(NAMES[rnd.nextInt(NAMES.length)]);
-			u.setFavoriteColor(COLORS[rnd.nextInt(COLORS.length)]);
-			u.setFavoriteNumber(rnd.nextInt(87));
-			
-			SUser su = new SUser();
-			su.datum(u);
-			
-			record.setField(0, su);
-			return record;
-		}
-	}
-	
-	public static final class PrintingOutputFormat implements OutputFormat<Record> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void configure(Configuration parameters) {}
-
-		@Override
-		public void open(int taskNumber, int numTasks) throws IOException {}
-
-		@Override
-		public void writeRecord(Record record) throws IOException {
-			int color = record.getField(0, IntValue.class).getValue();
-			String names = record.getField(1, StringValue.class).getValue();
-			
-			System.out.println(color + ": " + names);
-		}
-		
-		@Override
-		public void close() throws IOException {}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/SUser.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/SUser.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/SUser.java
deleted file mode 100644
index 542639e..0000000
--- a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/SUser.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io.avro.example;
-
-import org.apache.flink.api.avro.AvroBaseValue;
-
-public class SUser extends AvroBaseValue<User> {
-	private static final long serialVersionUID = 1L;
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/User.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/User.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/User.java
deleted file mode 100644
index d4fb292..0000000
--- a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/User.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/**
- * Autogenerated by Avro
- * 
- * DO NOT EDIT DIRECTLY
- */
-package org.apache.flink.api.java.record.io.avro.example;  
-@SuppressWarnings("all")
-@org.apache.avro.specific.AvroGenerated
-public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
-  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"org.apache.flink.api.avro.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}");
-  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
-  @Deprecated public java.lang.CharSequence name;
-  @Deprecated public java.lang.Integer favorite_number;
-  @Deprecated public java.lang.CharSequence favorite_color;
-
-  /**
-   * Default constructor.  Note that this does not initialize fields
-   * to their default values from the schema.  If that is desired then
-   * one should use {@link \#newBuilder()}. 
-   */
-  public User() {}
-
-  /**
-   * All-args constructor.
-   */
-  public User(java.lang.CharSequence name, java.lang.Integer favorite_number, java.lang.CharSequence favorite_color) {
-    this.name = name;
-    this.favorite_number = favorite_number;
-    this.favorite_color = favorite_color;
-  }
-
-  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
-  // Used by DatumWriter.  Applications should not call. 
-  public java.lang.Object get(int field$) {
-    switch (field$) {
-    case 0: return name;
-    case 1: return favorite_number;
-    case 2: return favorite_color;
-    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
-    }
-  }
-  // Used by DatumReader.  Applications should not call. 
-  @SuppressWarnings(value="unchecked")
-  public void put(int field$, java.lang.Object value$) {
-    switch (field$) {
-    case 0: name = (java.lang.CharSequence)value$; break;
-    case 1: favorite_number = (java.lang.Integer)value$; break;
-    case 2: favorite_color = (java.lang.CharSequence)value$; break;
-    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
-    }
-  }
-
-  /**
-   * Gets the value of the 'name' field.
-   */
-  public java.lang.CharSequence getName() {
-    return name;
-  }
-
-  /**
-   * Sets the value of the 'name' field.
-   * @param value the value to set.
-   */
-  public void setName(java.lang.CharSequence value) {
-    this.name = value;
-  }
-
-  /**
-   * Gets the value of the 'favorite_number' field.
-   */
-  public java.lang.Integer getFavoriteNumber() {
-    return favorite_number;
-  }
-
-  /**
-   * Sets the value of the 'favorite_number' field.
-   * @param value the value to set.
-   */
-  public void setFavoriteNumber(java.lang.Integer value) {
-    this.favorite_number = value;
-  }
-
-  /**
-   * Gets the value of the 'favorite_color' field.
-   */
-  public java.lang.CharSequence getFavoriteColor() {
-    return favorite_color;
-  }
-
-  /**
-   * Sets the value of the 'favorite_color' field.
-   * @param value the value to set.
-   */
-  public void setFavoriteColor(java.lang.CharSequence value) {
-    this.favorite_color = value;
-  }
-
-  /** Creates a new User RecordBuilder */
-  public static org.apache.flink.api.java.record.io.avro.example.User.Builder newBuilder() {
-    return new org.apache.flink.api.java.record.io.avro.example.User.Builder();
-  }
-  
-  /** Creates a new User RecordBuilder by copying an existing Builder */
-  public static org.apache.flink.api.java.record.io.avro.example.User.Builder newBuilder(org.apache.flink.api.java.record.io.avro.example.User.Builder other) {
-    return new org.apache.flink.api.java.record.io.avro.example.User.Builder(other);
-  }
-  
-  /** Creates a new User RecordBuilder by copying an existing User instance */
-  public static org.apache.flink.api.java.record.io.avro.example.User.Builder newBuilder(org.apache.flink.api.java.record.io.avro.example.User other) {
-    return new org.apache.flink.api.java.record.io.avro.example.User.Builder(other);
-  }
-  
-  /**
-   * RecordBuilder for User instances.
-   */
-  public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<User>
-    implements org.apache.avro.data.RecordBuilder<User> {
-
-    private java.lang.CharSequence name;
-    private java.lang.Integer favorite_number;
-    private java.lang.CharSequence favorite_color;
-
-    /** Creates a new Builder */
-    private Builder() {
-      super(org.apache.flink.api.java.record.io.avro.example.User.SCHEMA$);
-    }
-    
-    /** Creates a Builder by copying an existing Builder */
-    private Builder(org.apache.flink.api.java.record.io.avro.example.User.Builder other) {
-      super(other);
-      if (isValidValue(fields()[0], other.name)) {
-        this.name = data().deepCopy(fields()[0].schema(), other.name);
-        fieldSetFlags()[0] = true;
-      }
-      if (isValidValue(fields()[1], other.favorite_number)) {
-        this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
-        fieldSetFlags()[1] = true;
-      }
-      if (isValidValue(fields()[2], other.favorite_color)) {
-        this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
-        fieldSetFlags()[2] = true;
-      }
-    }
-    
-    /** Creates a Builder by copying an existing User instance */
-    private Builder(org.apache.flink.api.java.record.io.avro.example.User other) {
-            super(org.apache.flink.api.java.record.io.avro.example.User.SCHEMA$);
-      if (isValidValue(fields()[0], other.name)) {
-        this.name = data().deepCopy(fields()[0].schema(), other.name);
-        fieldSetFlags()[0] = true;
-      }
-      if (isValidValue(fields()[1], other.favorite_number)) {
-        this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
-        fieldSetFlags()[1] = true;
-      }
-      if (isValidValue(fields()[2], other.favorite_color)) {
-        this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
-        fieldSetFlags()[2] = true;
-      }
-    }
-
-    /** Gets the value of the 'name' field */
-    public java.lang.CharSequence getName() {
-      return name;
-    }
-    
-    /** Sets the value of the 'name' field */
-    public org.apache.flink.api.java.record.io.avro.example.User.Builder setName(java.lang.CharSequence value) {
-      validate(fields()[0], value);
-      this.name = value;
-      fieldSetFlags()[0] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'name' field has been set */
-    public boolean hasName() {
-      return fieldSetFlags()[0];
-    }
-    
-    /** Clears the value of the 'name' field */
-    public org.apache.flink.api.java.record.io.avro.example.User.Builder clearName() {
-      name = null;
-      fieldSetFlags()[0] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'favorite_number' field */
-    public java.lang.Integer getFavoriteNumber() {
-      return favorite_number;
-    }
-    
-    /** Sets the value of the 'favorite_number' field */
-    public org.apache.flink.api.java.record.io.avro.example.User.Builder setFavoriteNumber(java.lang.Integer value) {
-      validate(fields()[1], value);
-      this.favorite_number = value;
-      fieldSetFlags()[1] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'favorite_number' field has been set */
-    public boolean hasFavoriteNumber() {
-      return fieldSetFlags()[1];
-    }
-    
-    /** Clears the value of the 'favorite_number' field */
-    public org.apache.flink.api.java.record.io.avro.example.User.Builder clearFavoriteNumber() {
-      favorite_number = null;
-      fieldSetFlags()[1] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'favorite_color' field */
-    public java.lang.CharSequence getFavoriteColor() {
-      return favorite_color;
-    }
-    
-    /** Sets the value of the 'favorite_color' field */
-    public org.apache.flink.api.java.record.io.avro.example.User.Builder setFavoriteColor(java.lang.CharSequence value) {
-      validate(fields()[2], value);
-      this.favorite_color = value;
-      fieldSetFlags()[2] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'favorite_color' field has been set */
-    public boolean hasFavoriteColor() {
-      return fieldSetFlags()[2];
-    }
-    
-    /** Clears the value of the 'favorite_color' field */
-    public org.apache.flink.api.java.record.io.avro.example.User.Builder clearFavoriteColor() {
-      favorite_color = null;
-      fieldSetFlags()[2] = false;
-      return this;
-    }
-
-    @Override
-    public User build() {
-      try {
-        User record = new User();
-        record.name = fieldSetFlags()[0] ? this.name : (java.lang.CharSequence) defaultValue(fields()[0]);
-        record.favorite_number = fieldSetFlags()[1] ? this.favorite_number : (java.lang.Integer) defaultValue(fields()[1]);
-        record.favorite_color = fieldSetFlags()[2] ? this.favorite_color : (java.lang.CharSequence) defaultValue(fields()[2]);
-        return record;
-      } catch (Exception e) {
-        throw new org.apache.avro.AvroRuntimeException(e);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
index a60787d..db235c0 100644
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
@@ -28,11 +28,11 @@ import org.apache.avro.file.DataFileReader;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.flink.api.io.avro.example.User;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.io.AvroOutputFormat;
-import org.apache.flink.api.java.record.io.avro.example.User;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.test.util.JavaProgramTestBase;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java
deleted file mode 100644
index a3850e2..0000000
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.avro.reflect.Nullable;
-import org.apache.flink.api.avro.AvroBaseValue;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.record.functions.CoGroupFunction;
-import org.apache.flink.api.java.record.io.GenericInputFormat;
-import org.apache.flink.api.java.record.operators.CoGroupOperator;
-import org.apache.flink.api.java.record.operators.GenericDataSink;
-import org.apache.flink.api.java.record.operators.GenericDataSource;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-
-public class AvroWithEmptyArrayITCase extends RecordAPITestBase {
-
-	@Override
-	protected Plan getTestJob() {
-		GenericDataSource<RandomInputFormat> bookSource = new GenericDataSource<RandomInputFormat>(
-			new RandomInputFormat(true));
-		GenericDataSource<RandomInputFormat> authorSource = new GenericDataSource<RandomInputFormat>(
-			new RandomInputFormat(false));
-
-		CoGroupOperator coGroupOperator = CoGroupOperator.builder(MyCoGrouper.class, LongValue.class, 0, 0)
-			.input1(bookSource).input2(authorSource).name("CoGrouper Test").build();
-
-		GenericDataSink sink = new GenericDataSink(PrintingOutputFormat.class, coGroupOperator);
-
-		Plan plan = new Plan(sink, "CoGroper Test Plan");
-		plan.setDefaultParallelism(1);
-		return plan;
-	}
-
-	public static class SBookAvroValue extends AvroBaseValue<Book> {
-		private static final long serialVersionUID = 1L;
-
-		public SBookAvroValue() {}
-
-		public SBookAvroValue(Book datum) {
-			super(datum);
-		}
-	}
-
-	public static class Book {
-
-		long bookId;
-		@Nullable
-		String title;
-		long authorId;
-
-		public Book() {
-		}
-
-		public Book(long bookId, String title, long authorId) {
-			this.bookId = bookId;
-			this.title = title;
-			this.authorId = authorId;
-		}
-	}
-
-	public static class SBookAuthorValue extends AvroBaseValue<BookAuthor> {
-		private static final long serialVersionUID = 1L;
-
-		public SBookAuthorValue() {}
-
-		public SBookAuthorValue(BookAuthor datum) {
-			super(datum);
-		}
-	}
-
-	public static class BookAuthor {
-
-		enum BookType {
-			book,
-			article,
-			journal
-		}
-
-		long authorId;
-
-		@Nullable
-		List<String> bookTitles;
-
-		@Nullable
-		List<Book> books;
-
-		String authorName;
-
-		BookType bookType;
-
-		public BookAuthor() {}
-
-		public BookAuthor(long authorId, List<String> bookTitles, String authorName) {
-			this.authorId = authorId;
-			this.bookTitles = bookTitles;
-			this.authorName = authorName;
-		}
-	}
-
-	public static class RandomInputFormat extends GenericInputFormat {
-		private static final long serialVersionUID = 1L;
-
-		private final boolean isBook;
-
-		private boolean touched = false;
-
-		public RandomInputFormat(boolean isBook) {
-			this.isBook = isBook;
-		}
-
-		@Override
-		public boolean reachedEnd() throws IOException {
-			return touched;
-		}
-
-		@Override
-		public Record nextRecord(Record record) throws IOException {
-			touched = true;
-			record.setField(0, new LongValue(26382648));
-
-			if (isBook) {
-				Book b = new Book(123, "This is a test book", 26382648);
-				record.setField(1, new SBookAvroValue(b));
-			} else {
-				List<String> titles = new ArrayList<String>();
-				// titles.add("Title1");
-				// titles.add("Title2");
-				// titles.add("Title3");
-
-				List<Book> books = new ArrayList<Book>();
-				books.add(new Book(123, "This is a test book", 1));
-				books.add(new Book(24234234, "This is a test book", 1));
-				books.add(new Book(1234324, "This is a test book", 3));
-
-				BookAuthor a = new BookAuthor(1, titles, "Test Author");
-				a.books = books;
-				a.bookType = BookAuthor.BookType.journal;
-				record.setField(1, new SBookAuthorValue(a));
-			}
-
-			return record;
-		}
-	}
-
-	public static final class PrintingOutputFormat implements OutputFormat<Record> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void configure(Configuration parameters) {}
-
-		@Override
-		public void open(int taskNumber, int numTasks) {}
-
-		@Override
-		public void writeRecord(Record record) throws IOException {
-			long key = record.getField(0, LongValue.class).getValue();
-			String val = record.getField(1, StringValue.class).getValue();
-			System.out.println(key + " : " + val);
-		}
-
-		@Override
-		public void close() {}
-	}
-
-	public static class MyCoGrouper extends CoGroupFunction {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out) {
-
-			Record r1 = null;
-			while (records1.hasNext()) {
-				r1 = records1.next();
-			}
-			Record r2 = null;
-			while (records2.hasNext()) {
-				r2 = records2.next();
-			}
-
-			if (r1 != null) {
-				r1.getField(1, SBookAvroValue.class).datum();
-			}
-
-			if (r2 != null) {
-				r2.getField(1, SBookAuthorValue.class).datum();
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
index 78ff2f1..74b5397 100644
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
@@ -31,8 +31,8 @@ import java.util.Random;
 
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.flink.api.java.record.io.avro.generated.Colors;
-import org.apache.flink.api.java.record.io.avro.generated.User;
+import org.apache.flink.api.io.avro.generated.Colors;
+import org.apache.flink.api.io.avro.generated.User;
 import org.apache.flink.util.StringUtils;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
index 31e083f..25e2e0c 100644
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
@@ -39,7 +39,6 @@ import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.flink.api.avro.AvroBaseValue;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -125,18 +124,6 @@ public class AvroExternalJarProgram  {
 		}
 	}
 	
-	
-	public static final class SUser extends AvroBaseValue<MyUser> {
-		
-		static final long serialVersionUID = 1L;
-
-		public SUser() {}
-	
-		public SUser(MyUser u) {
-			super(u);
-		}
-	}
-	
 	// --------------------------------------------------------------------------------------------
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
new file mode 100644
index 0000000..d8d8b46
--- /dev/null
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.io.avro;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.api.io.avro.generated.Colors;
+import org.apache.flink.api.io.avro.generated.User;
+import org.apache.flink.api.java.io.AvroInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test the avro input format.
+ * (The testcase is mostly the getting started tutorial of avro)
+ * http://avro.apache.org/docs/current/gettingstartedjava.html
+ */
+public class AvroRecordInputFormatTest {
+	
+	private File testFile;
+	
+	final static String TEST_NAME = "Alyssa";
+	
+	final static String TEST_ARRAY_STRING_1 = "ELEMENT 1";
+	final static String TEST_ARRAY_STRING_2 = "ELEMENT 2";
+	
+	final static boolean TEST_ARRAY_BOOLEAN_1 = true;
+	final static boolean TEST_ARRAY_BOOLEAN_2 = false;
+	
+	final static Colors TEST_ENUM_COLOR = Colors.GREEN;
+	
+	final static String TEST_MAP_KEY1 = "KEY 1";
+	final static long TEST_MAP_VALUE1 = 8546456L;
+	final static String TEST_MAP_KEY2 = "KEY 2";
+	final static long TEST_MAP_VALUE2 = 17554L;
+
+	@Before
+	public void createFiles() throws IOException {
+		testFile = File.createTempFile("AvroInputFormatTest", null);
+		
+		ArrayList<CharSequence> stringArray = new ArrayList<CharSequence>();
+		stringArray.add(TEST_ARRAY_STRING_1);
+		stringArray.add(TEST_ARRAY_STRING_2);
+		
+		ArrayList<Boolean> booleanArray = new ArrayList<Boolean>();
+		booleanArray.add(TEST_ARRAY_BOOLEAN_1);
+		booleanArray.add(TEST_ARRAY_BOOLEAN_2);
+		
+		HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, Long>();
+		longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1);
+		longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2);
+		
+		
+		User user1 = new User();
+		user1.setName(TEST_NAME);
+		user1.setFavoriteNumber(256);
+		user1.setTypeDoubleTest(123.45d);
+		user1.setTypeBoolTest(true);
+		user1.setTypeArrayString(stringArray);
+		user1.setTypeArrayBoolean(booleanArray);
+		user1.setTypeEnum(TEST_ENUM_COLOR);
+		user1.setTypeMap(longMap);
+		
+		// Construct via builder
+		User user2 = User.newBuilder()
+		             .setName("Charlie")
+		             .setFavoriteColor("blue")
+		             .setFavoriteNumber(null)
+		             .setTypeBoolTest(false)
+		             .setTypeDoubleTest(1.337d)
+		             .setTypeNullTest(null)
+		             .setTypeLongTest(1337L)
+		             .setTypeArrayString(new ArrayList<CharSequence>())
+		             .setTypeArrayBoolean(new ArrayList<Boolean>())
+		             .setTypeNullableArray(null)
+		             .setTypeEnum(Colors.RED)
+		             .setTypeMap(new HashMap<CharSequence, Long>())
+		             .build();
+		DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
+		DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
+		dataFileWriter.create(user1.getSchema(), testFile);
+		dataFileWriter.append(user1);
+		dataFileWriter.append(user2);
+		dataFileWriter.close();
+	}
+	
+	@Test
+	public void testDeserialisation() throws IOException {
+		Configuration parameters = new Configuration();
+		
+		AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
+		
+		format.configure(parameters);
+		FileInputSplit[] splits = format.createInputSplits(1);
+		assertEquals(splits.length, 1);
+		format.open(splits[0]);
+		
+		User u = format.nextRecord(null);
+		assertNotNull(u);
+		
+		String name = u.getName().toString();
+		assertNotNull("empty record", name);
+		assertEquals("name not equal", TEST_NAME, name);
+		
+		// check arrays
+		List<CharSequence> sl = u.getTypeArrayString();
+		assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, sl.get(0).toString());
+		assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, sl.get(1).toString());
+		
+		List<Boolean> bl = u.getTypeArrayBoolean();
+		assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0));
+		assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1));
+		
+		// check enums
+		Colors enumValue = u.getTypeEnum();
+		assertEquals("enum not equal", TEST_ENUM_COLOR, enumValue);
+		
+		// check maps
+		Map<CharSequence, Long> lm = u.getTypeMap();
+		assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue());
+		assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue());
+		
+		assertFalse("expecting second element", format.reachedEnd());
+		assertNotNull("expecting second element", format.nextRecord(u));
+		
+		assertNull(format.nextRecord(u));
+		assertTrue(format.reachedEnd());
+		
+		format.close();
+	}
+	
+	@After
+	public void deleteFiles() {
+		testFile.delete();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java
new file mode 100644
index 0000000..58e1f5c
--- /dev/null
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * Autogenerated by Avro
+ * 
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.flink.api.io.avro.generated;  
+@SuppressWarnings("all")
+@org.apache.avro.specific.AvroGenerated
+public enum Colors { 
+  RED, GREEN, BLUE  ;
+  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"enum\",\"name\":\"Colors\",\"namespace\":\"org.apache.flink.api.io.avro.generated\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}");
+  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+}


[9/9] git commit: [FLINK-1118] Exclude log (and crash) files from rat checks.

Posted by se...@apache.org.
[FLINK-1118] Exclude log (and crash) files from rat checks.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/15e59906
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/15e59906
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/15e59906

Branch: refs/heads/master
Commit: 15e5990624d7ae6a0c17beae04daf4bbfa63e142
Parents: 719e288
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 30 14:36:09 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 30 18:39:59 2014 +0200

----------------------------------------------------------------------
 pom.xml | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/15e59906/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 081ccc0..3077778 100644
--- a/pom.xml
+++ b/pom.xml
@@ -535,6 +535,7 @@ under the License.
 						<!-- Additional files like .gitignore etc.-->
 						<exclude>**/.*</exclude>
 						<exclude>**/*.prefs</exclude>
+						<exclude>**/*.log</exclude>
 						<!-- Resource files which have values. -->
 						<exclude>**/resources/**</exclude>
 						<!-- Configuration Files. -->


[6/9] git commit: Major depedendency cleanups - add "flink-shading" for shaded libraries (currently guava only) - relocate guava in all projects and exclude it from the lib and exported dependencies - Exlude unnecessary transitive dependencies from

Posted by se...@apache.org.
Major depedendency cleanups
  - add "flink-shading" for shaded libraries (currently guava only)
  - relocate guava in all projects and exclude it from the lib and exported dependencies
  - Exlude unnecessary transitive dependencies from hadoop 1


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/758de7db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/758de7db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/758de7db

Branch: refs/heads/master
Commit: 758de7dbcd5148e17d94a45c5f6b74016b013a70
Parents: 7e43b1c
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Sep 29 20:39:09 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 30 18:39:58 2014 +0200

----------------------------------------------------------------------
 .../flink-streaming-connectors/pom.xml          |  51 +++++-
 .../flink-streaming-core/pom.xml                |   2 +-
 .../flink-streaming-examples/pom.xml            |   2 +-
 flink-addons/flink-streaming/pom.xml            |  20 ---
 flink-core/pom.xml                              |   8 +
 .../api/common/accumulators/Histogram.java      |   5 +-
 flink-dist/pom.xml                              |  12 ++
 flink-dist/src/main/assemblies/bin.xml          |  27 +--
 flink-dist/src/main/flink-bin/bin/flink         |   6 -
 flink-dist/src/main/flink-bin/bin/webclient.sh  |   6 -
 flink-runtime/pom.xml                           |  28 +++
 flink-shaded/pom.xml                            |  80 +++++++++
 pom.xml                                         | 172 +++++++++++--------
 13 files changed, 272 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/758de7db/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml b/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
index 9777477..e941112 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
@@ -99,8 +99,8 @@ under the License.
 			<artifactId>spymemcached</artifactId>
 			<version>2.8.4</version>
 		</dependency>
-
-    </dependencies>
+		
+	</dependencies>
 
 	<build>
 		<plugins>
@@ -115,14 +115,47 @@ under the License.
 					</execution>
 				</executions>
 			</plugin>
+			
+						<!-- Relocate the entire Google Guava dependency into a different namespace -->
 			<plugin>
-            	<artifactId>maven-assembly-plugin</artifactId>
-                <configuration>
-            	    <descriptorRefs>
-                		<descriptorRef>jar-with-dependencies</descriptorRef>
-                    </descriptorRefs>
-            	</configuration>
-        	</plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<version>2.3</version>
+				<executions>
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<shadedArtifactAttached>false</shadedArtifactAttached>
+					<createDependencyReducedPom>true</createDependencyReducedPom>
+					<dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
+					<artifactSet>
+						<includes>
+							<include>org.apache.flume:*</include>
+							<include>com.twitter:*</include>
+						</includes>
+					</artifactSet>
+					<relocations>
+						<relocation>
+							<pattern>com.google</pattern>
+							<shadedPattern>org.apache.flink.shaded.com.google</shadedPattern>
+						</relocation>
+					</relocations>
+					<filters>
+						<filter>
+							<artifact>*:*</artifact>
+							<excludes>
+								<exclude>build.properties</exclude>
+							</excludes>
+						</filter>
+					</filters>
+				</configuration>
+			</plugin>
+			
 		</plugins>
 	</build>
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/758de7db/flink-addons/flink-streaming/flink-streaming-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/pom.xml b/flink-addons/flink-streaming/flink-streaming-core/pom.xml
index 0be9839..63b1145 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-core/pom.xml
@@ -46,7 +46,7 @@ under the License.
 			<groupId>org.apache.commons</groupId>
 			<artifactId>commons-math</artifactId>
 			<version>2.2</version>
-        </dependency>
+		</dependency>
 
 	</dependencies>
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/758de7db/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
index 7c02f27..228d69c 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
@@ -48,7 +48,7 @@ under the License.
 			<version>${project.version}</version>
 		</dependency>
 
-        	<dependency>
+		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-streaming-connectors</artifactId>
 			<version>${project.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/758de7db/flink-addons/flink-streaming/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/pom.xml b/flink-addons/flink-streaming/pom.xml
index a354df6..2fcf5a8 100644
--- a/flink-addons/flink-streaming/pom.xml
+++ b/flink-addons/flink-streaming/pom.xml
@@ -44,51 +44,31 @@ under the License.
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-core</artifactId>
 			<version>${project.version}</version>
-			<type>jar</type>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-compiler</artifactId>
 			<version>${project.version}</version>
-			<type>jar</type>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-runtime</artifactId>
 			<version>${project.version}</version>
-			<type>jar</type>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-clients</artifactId>
 			<version>${project.version}</version>
-			<type>jar</type>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-java</artifactId>
 			<version>${project.version}</version>
-			<type>jar</type>
 		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-tests</artifactId>
-			<version>${project.version}</version>
-			<type>jar</type>
-		</dependency>
-	
-		<dependency>
-			<groupId>org.apache.commons</groupId>
-			<artifactId>commons-lang3</artifactId>
-			<version>3.3.2</version>
-			<type>jar</type>
-		</dependency>
-
 	</dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/758de7db/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index e0c0a2f..b3ca398 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -34,6 +34,14 @@ under the License.
 
 	<packaging>jar</packaging>
 
+	<dependencies>
+		<dependency>
+	                <groupId>org.apache.flink</groupId>
+	                <artifactId>flink-shaded</artifactId>
+	                <version>${project.version}</version>
+		</dependency>
+	</dependencies>
+
 	<build>
 		<plugins>
 			<plugin>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/758de7db/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java
index 857dada..6f5c0e9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java
@@ -21,12 +21,11 @@ package org.apache.flink.api.common.accumulators;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.TreeMap;
 
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
-import com.google.common.collect.Maps;
-
 /**
  * Histogram for discrete-data. Let's you populate a histogram distributedly.
  * Implemented as a Integer->Integer TreeMap, so that the entries are sorted
@@ -39,7 +38,7 @@ public class Histogram implements Accumulator<Integer, Map<Integer, Integer>> {
 
 	private static final long serialVersionUID = 1L;
 
-	private Map<Integer, Integer> treeMap = Maps.newTreeMap();
+	private Map<Integer, Integer> treeMap = new TreeMap<Integer, Integer>();
 
 	@Override
 	public void add(Integer value) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/758de7db/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index b9aee28..d39b5ac 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -37,6 +37,18 @@ under the License.
 		<!-- BINARIES -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-shaded</artifactId>
+			<version>${project.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>com.google.guava</groupId>
+					<artifactId>guava</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-core</artifactId>
 			<version>${project.version}</version>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/758de7db/flink-dist/src/main/assemblies/bin.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml
index 90a3bcd..3353515 100644
--- a/flink-dist/src/main/assemblies/bin.xml
+++ b/flink-dist/src/main/assemblies/bin.xml
@@ -41,37 +41,12 @@ under the License.
 			<excludes>
 				<exclude>org.apache.flink:flink-java-examples</exclude>
 				<exclude>org.apache.flink:flink-scala-examples</exclude>
+				<exclude>org.apache.flink:flink-streaming-examples</exclude>
 				<!--
 				<exclude>**/*javadoc*</exclude>
 				<exclude>**/*sources*</exclude>
 				-->
-				<!--
-				  This is a hardcoded exclude-list containing all libraries that are exclusively used in pact-clients.
-				  The previous command did not work because it also excludes libraries that should be in lib, such as commons-io.
-				-->
-				<exclude>commons-fileupload:commons-fileupload</exclude>
-			</excludes>
-		</dependencySet>
-		
-		<dependencySet>
-			<outputDirectory>lib_clients</outputDirectory>
-			<unpack>false</unpack>
-			<useTransitiveDependencies>true</useTransitiveDependencies>
-			<useProjectArtifact>false</useProjectArtifact>
-			<useProjectAttachments>false</useProjectAttachments>
-			<useTransitiveFiltering>true</useTransitiveFiltering>
-			
-			<includes>
-				<include>org.apache.flink:flink-clients:**</include>
-			</includes>
-			
-			<!--
-			<excludes>
-				<exclude>**/*examples*.jar</exclude>
-				<exclude>**/*javadoc*</exclude>
-				<exclude>**/*sources*</exclude>
 			</excludes>
-			-->
 		</dependencySet>
 	</dependencySets>
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/758de7db/flink-dist/src/main/flink-bin/bin/flink
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/flink b/flink-dist/src/main/flink-bin/bin/flink
index 4b4cd54..1dbc4f3 100755
--- a/flink-dist/src/main/flink-bin/bin/flink
+++ b/flink-dist/src/main/flink-bin/bin/flink
@@ -27,8 +27,6 @@ if [ "$FLINK_IDENT_STRING" = "" ]; then
         FLINK_IDENT_STRING="$USER"
 fi
 
-FLINK_LIB_CLIENTS_DIR=$FLINK_ROOT_DIR/lib_clients
-
 
 # auxiliary function to construct a lightweight classpath for the
 # Flink CLI client
@@ -41,10 +39,6 @@ constructCLIClientClassPath() {
 			CC_CLASSPATH=$CC_CLASSPATH:$jarfile
 		fi
 	done
-	
-	for jarfile in $FLINK_LIB_CLIENTS_DIR/*.jar ; do
-		CC_CLASSPATH=$CC_CLASSPATH:$jarfile
-	done
 
 	echo $CC_CLASSPATH
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/758de7db/flink-dist/src/main/flink-bin/bin/webclient.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/webclient.sh b/flink-dist/src/main/flink-bin/bin/webclient.sh
index b45001a..cda41bd 100755
--- a/flink-dist/src/main/flink-bin/bin/webclient.sh
+++ b/flink-dist/src/main/flink-bin/bin/webclient.sh
@@ -30,8 +30,6 @@ if [ "$FLINK_IDENT_STRING" = "" ]; then
         FLINK_IDENT_STRING="$USER"
 fi
 
-FLINK_LIB_CLIENTS_DIR=$FLINK_ROOT_DIR/lib_clients
-
 JVM_ARGS="$JVM_ARGS -Xmx512m"
 
 # auxilliary function to construct the classpath for the webclient
@@ -44,10 +42,6 @@ constructWebclientClassPath() {
 			FLINK_WEBCLIENT_CLASSPATH=$FLINK_WEBCLIENT_CLASSPATH:$jarfile
 		fi
 	done
-	
-	for jarfile in "$FLINK_LIB_CLIENTS_DIR"/*.jar ; do
-		FLINK_WEBCLIENT_CLASSPATH=$FLINK_WEBCLIENT_CLASSPATH:$jarfile
-	done
 
 	echo $FLINK_WEBCLIENT_CLASSPATH
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/758de7db/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 2a36dc8..b84a4a0 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -184,6 +184,34 @@ under the License.
 							<groupId>asm</groupId>
 							<artifactId>asm</artifactId>
 						</exclusion>
+						<exclusion>
+							<groupId>tomcat</groupId>
+ 							<artifactId>jasper-compiler</artifactId>
+						</exclusion>
+						<exclusion>
+							<groupId>tomcat</groupId>
+							<artifactId>jasper-runtime</artifactId>
+						</exclusion>
+						<exclusion>
+							<groupId>org.mortbay.jetty</groupId>
+							<artifactId>jetty</artifactId>
+						</exclusion>
+						<exclusion>
+							<groupId>org.mortbay.jetty</groupId>
+							<artifactId>jsp-api-2.1</artifactId>
+						</exclusion>
+						<exclusion>
+							<groupId>org.mortbay.jetty</groupId>
+							<artifactId>jsp-2.1</artifactId>
+						</exclusion>
+						<exclusion>
+							<groupId>org.mortbay.jetty</groupId>
+							<artifactId>jetty-util</artifactId>
+						</exclusion>
+						<exclusion>
+							<groupId>org.eclipse.jdt</groupId>
+							<artifactId>core</artifactId>
+						</exclusion>
 					</exclusions>
 				</dependency>
 			</dependencies>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/758de7db/flink-shaded/pom.xml
----------------------------------------------------------------------
diff --git a/flink-shaded/pom.xml b/flink-shaded/pom.xml
new file mode 100644
index 0000000..4d117ac
--- /dev/null
+++ b/flink-shaded/pom.xml
@@ -0,0 +1,80 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-parent</artifactId>
+		<version>0.7-incubating-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-shaded</artifactId>
+	<name>flink-shaded</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<scope>compile</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<!-- Relocate the entire Google Guava dependency into a different namespace -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<version>2.3</version>
+				<executions>
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<shadedArtifactAttached>false</shadedArtifactAttached>
+					<createDependencyReducedPom>true</createDependencyReducedPom>
+					<dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
+					<artifactSet>
+						<includes>
+							<include>com.google.guava:guava</include>
+						</includes>
+					</artifactSet>
+					<relocations>
+						<relocation>
+							<pattern>com.google</pattern>
+							<shadedPattern>org.apache.flink.shaded.com.google</shadedPattern>
+						</relocation>
+					</relocations>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/758de7db/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 99980ea..3c0b9b7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -52,6 +52,7 @@ under the License.
 	</scm>
 
 	<modules>
+		<module>flink-shaded</module>
 		<module>flink-core</module>
 		<module>flink-java</module>
 		<module>flink-scala</module>
@@ -83,13 +84,13 @@ under the License.
 		<dependency>
 			<groupId>org.apache.commons</groupId>
 			<artifactId>commons-lang3</artifactId>
-			<version>3.1</version>
+			<version>3.3.2</version>
 		</dependency>
 
 		<dependency>
-		    <groupId>org.slf4j</groupId>
-		    <artifactId>slf4j-api</artifactId>
-		    <version>${slf4j.version}</version>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-api</artifactId>
+			<version>${slf4j.version}</version>
 		</dependency>
 
 		<dependency>
@@ -103,13 +104,12 @@ under the License.
 			<artifactId>log4j</artifactId>
 			<version>1.2.17</version>
 		</dependency>
-
+		
+		<!--  global provided guava dependency, prevents guava from being packaged. we use a shaded variant instead -->
 		<dependency>
 			<groupId>com.google.guava</groupId>
 			<artifactId>guava</artifactId>
-			<version>17.0</version>
-			<type>jar</type>
-			<scope>compile</scope>
+			<scope>provided</scope>
 		</dependency>
 
 		<dependency>
@@ -151,8 +151,64 @@ under the License.
 			<type>jar</type>
 			<scope>test</scope>
 		</dependency>
-
 	</dependencies>
+	
+	<!-- this section defines the module versions that are used if nothing else is specified. -->
+	<dependencyManagement>
+		<dependencies>
+			
+			<!-- Make sure we use a consistent guava version throughout the project -->
+			<dependency>
+				<groupId>com.google.guava</groupId>
+				<artifactId>guava</artifactId>
+				<version>17.0</version>
+				<scope>compile</scope>
+			</dependency>
+		
+			<!-- Make sure we use a consistent jetty version throughout the project -->
+			<dependency>
+				<groupId>org.eclipse.jetty</groupId>
+				<artifactId>jetty-server</artifactId>
+				<version>8.0.0.M1</version>
+			</dependency>
+			<dependency>
+				<groupId>org.eclipse.jetty</groupId>
+				<artifactId>jetty-security</artifactId>
+				<version>8.0.0.M1</version>
+			</dependency>
+			<dependency>
+				<groupId>org.eclipse.jetty</groupId>
+				<artifactId>jetty-servlet</artifactId>
+				<version>8.0.0.M1</version>
+			</dependency>
+			
+			<!-- Make sure we use a consistent avro version throughout the project -->
+			<dependency>
+				<groupId>org.apache.avro</groupId>
+				<artifactId>avro</artifactId>
+				<version>1.7.6</version>
+			</dependency>
+			<dependency>
+				<groupId>org.apache.avro</groupId>
+				<artifactId>avro-mapred</artifactId>
+				<version>1.7.6</version>
+			</dependency>
+			
+			<!-- Managed dependency required for HBase in flink-hbase  -->
+			<dependency>
+				<groupId>org.javassist</groupId>
+				<artifactId>javassist</artifactId>
+				<version>3.18.1-GA</version>
+			</dependency>
+			
+			<!-- stax is pulled in different versions by different transitive dependencies-->
+			<dependency>
+				<groupId>stax</groupId>
+				<artifactId>stax-api</artifactId>
+				<version>1.0.1</version>
+			</dependency>
+		</dependencies>
+	</dependencyManagement>
 
 	<profiles>
 		<profile>
@@ -359,45 +415,6 @@ under the License.
 		</profile>
 	</profiles>
 
-	<dependencyManagement>
-		<!-- this section defines the module versions that are used if nothing else is specified. -->
-		<dependencies>
-			<!-- Make sure we use a consistent jetty version throughout the project -->
-			<dependency>
-				<groupId>org.eclipse.jetty</groupId>
-				<artifactId>jetty-server</artifactId>
-				<version>8.0.0.M1</version>
-			</dependency>
-			<dependency>
-				<groupId>org.eclipse.jetty</groupId>
-				<artifactId>jetty-security</artifactId>
-				<version>8.0.0.M1</version>
-			</dependency>
-			<dependency>
-				<groupId>org.eclipse.jetty</groupId>
-				<artifactId>jetty-servlet</artifactId>
-				<version>8.0.0.M1</version>
-			</dependency>
-			<!-- Make sure we use a consistent avro version throughout the project -->
-			<dependency>
-				<groupId>org.apache.avro</groupId>
-				<artifactId>avro</artifactId>
-				<version>1.7.6</version>
-			</dependency>
-			<dependency>
-				<groupId>org.apache.avro</groupId>
-				<artifactId>avro-mapred</artifactId>
-				<version>1.7.6</version>
-			</dependency>
-			<!-- Managed dependency required for HBase in pact-hbase  -->
-			<dependency>
-				<groupId>org.javassist</groupId>
-				<artifactId>javassist</artifactId>
-				<version>3.18.1-GA</version>
-			</dependency>
-		</dependencies>
-	</dependencyManagement>
-
 	<reporting>
 		<plugins>
 			<!-- execution of Unit Tests -->
@@ -424,33 +441,6 @@ under the License.
 	<build>
 		<plugins>
 			<plugin>
-		        <groupId>org.apache.maven.plugins</groupId>
-		        <artifactId>maven-shade-plugin</artifactId>
-		        <version>2.3</version>
-		        <executions>
-		          <execution>
-		            <phase>package</phase>
-		            <goals>
-		              <goal>shade</goal>
-		            </goals>
-					</execution>
-		        </executions>
-		            <configuration>
-		            	<createDependencyReducedPom>false</createDependencyReducedPom>
-			            <artifactSet>
-			                <includes>
-			                    <include>com.google.guava:guava</include>
-			                </includes>
-			            </artifactSet>
-			              <relocations>
-			                <relocation>
-			                    <pattern>com.google</pattern>
-			                    <shadedPattern>org.apache.flink.shaded.com.google</shadedPattern>
-			                </relocation>
-			              </relocations>
-		            </configuration>
-		      </plugin>
-			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-jar-plugin</artifactId>
 				<version>2.4</version><!--$NO-MVN-MAN-VER$-->
@@ -463,6 +453,37 @@ under the License.
 					</archive>
 				</configuration>
 			</plugin>
+			
+			<!-- Relocate references to Google Guava classes into a different namespace -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<version>2.3</version>
+				<executions>
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<shadedArtifactAttached>false</shadedArtifactAttached>
+					<createDependencyReducedPom>false</createDependencyReducedPom>
+					<artifactSet>
+						<includes>
+							<include>org.apache.flink:${project.artifact}</include>
+						</includes>
+					</artifactSet>
+					<relocations>
+						<relocation>
+							<pattern>com.google</pattern>
+							<shadedPattern>org.apache.flink.shaded.com.google</shadedPattern>
+						</relocation>
+					</relocations>
+				</configuration>
+			</plugin>
+			
 			<plugin>
 				<groupId>org.apache.rat</groupId>
 				<artifactId>apache-rat-plugin</artifactId>
@@ -531,6 +552,7 @@ under the License.
 						<exclude>tools/maven/checkstyle.xml</exclude>
 						<exclude>tools/maven/scalastyle-config.xml</exclude>
 						<exclude>**/scalastyle-output.xml</exclude>
+						<exclude>**/dependency-reduced-pom.xml</exclude>
 						<exclude>tools/maven/suppressions.xml</exclude>
 						<exclude>**/pom.xml</exclude>
 						<exclude>**/pom.hadoop2.xml</exclude>


[5/9] git commit: Deactivate shade plugin for quickstart projects

Posted by se...@apache.org.
Deactivate shade plugin for quickstart projects


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/38e4755a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/38e4755a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/38e4755a

Branch: refs/heads/master
Commit: 38e4755aa49e9849103f1e0ac35c04fe0b638bdb
Parents: 758de7d
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Sep 29 22:27:35 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 30 18:39:58 2014 +0200

----------------------------------------------------------------------
 flink-quickstart/pom.xml | 14 +++++++++++++-
 1 file changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/38e4755a/flink-quickstart/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/pom.xml b/flink-quickstart/pom.xml
index fbd0007..cf06a26 100644
--- a/flink-quickstart/pom.xml
+++ b/flink-quickstart/pom.xml
@@ -57,11 +57,23 @@ under the License.
 		<plugins>
 			<plugin>
 				<artifactId>maven-archetype-plugin</artifactId>
-				<version>2.2</version>
+				<version>2.2</version><!--$NO-MVN-MAN-VER$-->
 				<configuration>
 					<skip>${skipTests}</skip>
 				</configuration>
 			</plugin>
+			
+			<!-- deactivate the shade plugin for the quickstart scripts -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<phase/>
+					</execution>
+				</executions>
+			</plugin>
+			
 		</plugins>
 	</build>
 </project>
\ No newline at end of file


[8/9] git commit: - Allow dependencies to retain guava - Clean dependencies in flink-streaming connectors

Posted by se...@apache.org.
- Allow dependencies to retain guava
- Clean dependencies in flink-streaming connectors


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/719e2889
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/719e2889
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/719e2889

Branch: refs/heads/master
Commit: 719e2889395d7565bab4e7b0d994e2db32e592ba
Parents: 626d6b7
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 30 14:16:16 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 30 18:39:59 2014 +0200

----------------------------------------------------------------------
 flink-addons/flink-avro/pom.xml                 |   1 +
 flink-addons/flink-hadoop-compatibility/pom.xml |  34 +++++
 .../mapred/record/HadoopDataSink.java           |  18 +--
 .../mapred/record/HadoopDataSource.java         |  14 +-
 flink-addons/flink-hbase/pom.xml                |  37 +++--
 .../flink-streaming-connectors/pom.xml          |  12 +-
 flink-addons/flink-yarn/pom.xml                 | 144 +++++++++++++++++++
 flink-clients/pom.xml                           |  15 +-
 flink-compiler/pom.xml                          |   8 ++
 flink-core/pom.xml                              |  20 ++-
 .../typeutils/SerializerTestInstance.java       |   4 +-
 flink-dist/pom.xml                              |  18 +--
 flink-dist/src/main/assemblies/bin.xml          |   3 +
 flink-dist/src/main/assemblies/yarn-uberjar.xml |   1 +
 flink-dist/src/main/assemblies/yarn.xml         |   2 +
 flink-java/pom.xml                              |  57 +++++---
 flink-runtime/pom.xml                           |  77 +++++++++-
 flink-scala/pom.xml                             |   8 ++
 flink-shaded/pom.xml                            |   1 +
 flink-test-utils/pom.xml                        |   8 ++
 flink-tests/pom.xml                             |   8 ++
 pom.xml                                         |  32 ++---
 22 files changed, 432 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-addons/flink-avro/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/pom.xml b/flink-addons/flink-avro/pom.xml
index ff4decc..2d22507 100644
--- a/flink-addons/flink-avro/pom.xml
+++ b/flink-addons/flink-avro/pom.xml
@@ -58,6 +58,7 @@ under the License.
 		<dependency>
 			<groupId>com.google.guava</groupId>
 			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
 			<scope>provided</scope>
 		</dependency>
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-addons/flink-hadoop-compatibility/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/pom.xml b/flink-addons/flink-hadoop-compatibility/pom.xml
index a8ebe34..d5c42c2 100644
--- a/flink-addons/flink-hadoop-compatibility/pom.xml
+++ b/flink-addons/flink-hadoop-compatibility/pom.xml
@@ -75,6 +75,40 @@ under the License.
 					<groupId>org.apache.hadoop</groupId>
 					<artifactId>hadoop-mapreduce-client-core</artifactId>
 					<version>${hadoop.version}</version>
+					<exclusions>
+						<exclusion>
+							<groupId>asm</groupId>
+							<artifactId>asm</artifactId>
+						</exclusion>
+						<exclusion>
+							<groupId>tomcat</groupId>
+							<artifactId>jasper-compiler</artifactId>
+						</exclusion>
+						<exclusion>
+							<groupId>tomcat</groupId>
+							<artifactId>jasper-runtime</artifactId>
+						</exclusion>
+						<exclusion>
+							<groupId>org.mortbay.jetty</groupId>
+							<artifactId>jetty</artifactId>
+						</exclusion>
+						<exclusion>
+							<groupId>org.mortbay.jetty</groupId>
+							<artifactId>jsp-api-2.1</artifactId>
+						</exclusion>
+						<exclusion>
+							<groupId>org.mortbay.jetty</groupId>
+							<artifactId>jsp-2.1</artifactId>
+						</exclusion>
+						<exclusion>
+							<groupId>org.mortbay.jetty</groupId>
+							<artifactId>jetty-util</artifactId>
+						</exclusion>
+						<exclusion>
+							<groupId>org.eclipse.jdt</groupId>
+							<artifactId>core</artifactId>
+						</exclusion>
+					</exclusions>
 				</dependency>
 			</dependencies>
 		</profile>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java
index 38e166a..ff28a59 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java
@@ -19,6 +19,7 @@
 
 package org.apache.flink.hadoopcompatibility.mapred.record;
 
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.flink.api.common.operators.Operator;
@@ -31,9 +32,6 @@ import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-
 /**
  * The HadoopDataSink is a generic wrapper for all Hadoop OutputFormats.
  *
@@ -54,7 +52,7 @@ public class HadoopDataSink<K,V> extends GenericDataSink implements Validatable
 	private JobConf jobConf;
 
 	public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, String name, Operator<Record> input, FlinkTypeConverter<K,V> conv, Class<K> keyClass, Class<V> valueClass) {
-		this(hadoopFormat, jobConf, name, ImmutableList.<Operator<Record>>of(input), conv, keyClass, valueClass);
+		this(hadoopFormat, jobConf, name, Collections.<Operator<Record>>singletonList(input), conv, keyClass, valueClass);
 	}
 
 	public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, String name, Operator<Record> input, Class<K> keyClass, Class<V> valueClass) {
@@ -74,8 +72,11 @@ public class HadoopDataSink<K,V> extends GenericDataSink implements Validatable
 	@SuppressWarnings("deprecation")
 	public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, String name, List<Operator<Record>> input, FlinkTypeConverter<K,V> conv, Class<K> keyClass, Class<V> valueClass) {
 		super(new HadoopRecordOutputFormat<K,V>(hadoopFormat, jobConf, conv),input, name);
-		Preconditions.checkNotNull(hadoopFormat);
-		Preconditions.checkNotNull(jobConf);
+		
+		if (hadoopFormat == null || jobConf == null) {
+			throw new NullPointerException();
+		}
+		
 		this.name = name;
 		this.jobConf = jobConf;
 		jobConf.setOutputKeyClass(keyClass);
@@ -101,7 +102,8 @@ public class HadoopDataSink<K,V> extends GenericDataSink implements Validatable
 	@Override
 	public void check() {
 		// see for more details https://github.com/stratosphere/stratosphere/pull/531
-		Preconditions.checkNotNull(FileOutputFormat.getOutputPath(jobConf), "The HadoopDataSink currently expects a correct outputPath.");
+		if (FileOutputFormat.getOutputPath(jobConf) == null) {
+			throw new NullPointerException("The HadoopDataSink currently expects a correct outputPath.");
+		}
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSource.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSource.java
index 4f56289..508f069 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSource.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSource.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.hadoopcompatibility.mapred.record;
 
 
@@ -26,10 +25,6 @@ import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.HadoopTypeCo
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
 
-import com.google.common.base.Preconditions;
-
-
-
 /**
  * The HadoopDataSource is a generic wrapper for all Hadoop InputFormats.
  * 
@@ -44,7 +39,6 @@ import com.google.common.base.Preconditions;
  * The HadoopDataSource provides two different standard converters:
  * * WritableWrapperConverter: Converts Hadoop Types to a record that contains a WritableComparableWrapper (key) and a WritableWrapper
  * * DefaultHadoopTypeConverter: Converts the standard hadoop types (longWritable, Text) to Flinks's {@link org.apache.flink.types.Value} types.
- *
  */
 public class HadoopDataSource<K,V> extends GenericDataSource<HadoopRecordInputFormat<K,V>> {
 
@@ -61,9 +55,11 @@ public class HadoopDataSource<K,V> extends GenericDataSource<HadoopRecordInputFo
 	 */
 	public HadoopDataSource(InputFormat<K,V> hadoopFormat, JobConf jobConf, String name, HadoopTypeConverter<K,V> conv) {
 		super(new HadoopRecordInputFormat<K,V>(hadoopFormat, jobConf, conv),name);
-		Preconditions.checkNotNull(hadoopFormat);
-		Preconditions.checkNotNull(jobConf);
-		Preconditions.checkNotNull(conv);
+		
+		if (hadoopFormat == null || jobConf == null || conv == null) {
+			throw new NullPointerException();
+		}
+		
 		this.name = name;
 		this.jobConf = jobConf;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-addons/flink-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/pom.xml b/flink-addons/flink-hbase/pom.xml
index 5ae9174..c21c3ae 100644
--- a/flink-addons/flink-hbase/pom.xml
+++ b/flink-addons/flink-hbase/pom.xml
@@ -85,6 +85,34 @@ under the License.
 					<groupId>asm</groupId>
 					<artifactId>asm</artifactId>
 				</exclusion>
+				<exclusion>
+					<groupId>tomcat</groupId>
+					<artifactId>jasper-compiler</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>tomcat</groupId>
+					<artifactId>jasper-runtime</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jsp-api-2.1</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jsp-2.1</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty-util</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.eclipse.jdt</groupId>
+					<artifactId>core</artifactId>
+				</exclusion>
 			</exclusions>
 		</dependency>
 	</dependencies>
@@ -104,13 +132,4 @@ under the License.
 		to use profiles See ticket https://issues.apache.org/jira/browse/HADOOP-8009 
 		for description of hadoop-clients -->
 
-	<reporting>
-		<plugins>
-		</plugins>
-	</reporting>
-
-	<build>
-		<plugins>
-		</plugins>
-	</build>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml b/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
index e941112..51a1621 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
@@ -59,6 +59,10 @@ under the License.
 					<groupId>org.slf4j</groupId>
 					<artifactId>slf4j-simple</artifactId>
 				</exclusion>
+				<exclusion>
+					<groupId>org.apache.zookeeper</groupId>
+					<artifactId>zookeeper</artifactId>
+				</exclusion>
 			</exclusions>
 		</dependency>
 
@@ -72,9 +76,15 @@ under the License.
 			<groupId>org.apache.flume</groupId>
 			<artifactId>flume-ng-core</artifactId>
 			<version>1.5.0</version>
+			<exclusions>
+				<exclusion>
+					<groupId>io.netty</groupId>
+					<artifactId>netty</artifactId>
+				</exclusion>
+			</exclusions>
 		</dependency>
 
-			<dependency>
+		<dependency>
 			<groupId>com.twitter</groupId>
 			<artifactId>hbc-core</artifactId>
 			<version>2.2.0</version>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-addons/flink-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/pom.xml b/flink-addons/flink-yarn/pom.xml
index 1f6e8f8..3b6e631 100644
--- a/flink-addons/flink-yarn/pom.xml
+++ b/flink-addons/flink-yarn/pom.xml
@@ -51,28 +51,172 @@ under the License.
 			<version>${project.version}</version>
 		</dependency>
 		
+		<!--  guava needs to be in "provided" scope, so to not re-export the dependency -->
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		
 		<dependency>
 			<groupId>org.apache.hadoop</groupId>
 			<artifactId>hadoop-yarn-client</artifactId>
 			<version>${hadoop.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>asm</groupId>
+					<artifactId>asm</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>tomcat</groupId>
+						<artifactId>jasper-compiler</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>tomcat</groupId>
+					<artifactId>jasper-runtime</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jsp-api-2.1</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jsp-2.1</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty-util</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.eclipse.jdt</groupId>
+					<artifactId>core</artifactId>
+				</exclusion>
+			</exclusions>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.hadoop</groupId>
 			<artifactId>hadoop-common</artifactId>
 			<version>${hadoop.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>asm</groupId>
+					<artifactId>asm</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>tomcat</groupId>
+						<artifactId>jasper-compiler</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>tomcat</groupId>
+					<artifactId>jasper-runtime</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jsp-api-2.1</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jsp-2.1</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty-util</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.eclipse.jdt</groupId>
+					<artifactId>core</artifactId>
+				</exclusion>
+			</exclusions>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.hadoop</groupId>
 			<artifactId>hadoop-hdfs</artifactId>
 			<version>${hadoop.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>asm</groupId>
+					<artifactId>asm</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>tomcat</groupId>
+						<artifactId>jasper-compiler</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>tomcat</groupId>
+					<artifactId>jasper-runtime</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jsp-api-2.1</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jsp-2.1</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty-util</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.eclipse.jdt</groupId>
+					<artifactId>core</artifactId>
+				</exclusion>
+			</exclusions>
 		</dependency>
 
 		<dependency>
 			<groupId>org.apache.hadoop</groupId>
 			<artifactId>hadoop-mapreduce-client-core</artifactId>
 			<version>${hadoop.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>asm</groupId>
+					<artifactId>asm</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>tomcat</groupId>
+						<artifactId>jasper-compiler</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>tomcat</groupId>
+					<artifactId>jasper-runtime</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jsp-api-2.1</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jsp-2.1</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty-util</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.eclipse.jdt</groupId>
+					<artifactId>core</artifactId>
+				</exclusion>
+			</exclusions>
 		</dependency> 
 	</dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-clients/pom.xml
----------------------------------------------------------------------
diff --git a/flink-clients/pom.xml b/flink-clients/pom.xml
index 9dccb85..dcb0add 100644
--- a/flink-clients/pom.xml
+++ b/flink-clients/pom.xml
@@ -95,6 +95,19 @@ under the License.
 			<version>2.4</version>
 			<scope>compile</scope>
 		</dependency>
+		
+		<!--  guava needs to be in "provided" scope, so to not re-export the dependency -->
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		
+		<dependency>
+			<groupId>commons-cli</groupId>
+			<artifactId>commons-cli</artifactId>
+		</dependency>
 	</dependencies>
 
 	<!-- More information on this:
@@ -198,8 +211,6 @@ under the License.
 										<ignore/>
 									</action>
 								</pluginExecution>
-								
-								
 							</pluginExecutions>
 						</lifecycleMappingMetadata>
 					</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-compiler/pom.xml
----------------------------------------------------------------------
diff --git a/flink-compiler/pom.xml b/flink-compiler/pom.xml
index fa4c0e9..3727548 100644
--- a/flink-compiler/pom.xml
+++ b/flink-compiler/pom.xml
@@ -56,6 +56,14 @@ under the License.
 			<scope>compile</scope>
 		</dependency>
 		
+		<!--  guava needs to be in "provided" scope, so to not re-export the dependency -->
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		
 	</dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index b3ca398..c1e1b18 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -36,9 +36,23 @@ under the License.
 
 	<dependencies>
 		<dependency>
-	                <groupId>org.apache.flink</groupId>
-	                <artifactId>flink-shaded</artifactId>
-	                <version>${project.version}</version>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-shaded</artifactId>
+			<version>${project.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>com.google.guava</groupId>
+					<artifactId>guava</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		
+		<!--  guava needs to be in "provided" scope, so to not re-export the dependency -->
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
+			<scope>provided</scope>
 		</dependency>
 	</dependencies>
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
index 5b63633..c48e879 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
@@ -16,14 +16,12 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.typeutils;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
 
-public class
-		SerializerTestInstance<T> extends SerializerTestBase<T> {
+public class SerializerTestInstance<T> extends SerializerTestBase<T> {
 
 	private final TypeSerializer<T> serializer;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index d39b5ac..29f3341 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -117,6 +117,12 @@ under the License.
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-streaming-connectors</artifactId>
 			<version>${project.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>com.google.guava</groupId>
+					<artifactId>guava</artifactId>
+				</exclusion>
+			</exclusions>
 		</dependency>
 
 		<dependency>
@@ -137,7 +143,7 @@ under the License.
 				</property>
 			</activation>
 			<dependencies>
-				<!-- No extra dependencies: pact-hbase is currently not compatible with Hadoop v1 -->
+				<!-- No extra dependencies: flink-hbase is currently not compatible with Hadoop v1 -->
 			</dependencies>
 		</profile>
 
@@ -310,16 +316,6 @@ under the License.
 											</mapper>
 										</data>
 										<data>
-											<src>${project.build.directory}/${project.artifactId}-${project.version}-bin/flink-${project.version}/lib_clients</src>
-											<type>directory</type>
-											<mapper>
-												<type>perm</type>
-												<prefix>/usr/share/${project.artifactId}/lib_clients</prefix>
-												<user>root</user>
-												<group>root</group>
-											</mapper>
-										</data>
-										<data>
 											<src>${project.build.directory}/${project.artifactId}-${project.version}-bin/flink-${project.version}/resources</src>
 											<type>directory</type>
 											<mapper>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-dist/src/main/assemblies/bin.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml
index 3353515..0dc0196 100644
--- a/flink-dist/src/main/assemblies/bin.xml
+++ b/flink-dist/src/main/assemblies/bin.xml
@@ -140,6 +140,7 @@ under the License.
 			</includes>
 			<excludes>
 				<exclude>flink-java-examples-${project.version}.jar</exclude>
+				<exclude>original-flink-java-examples-${project.version}.jar</exclude>
 				<exclude>flink-java-examples-${project.version}-sources.jar</exclude>
 			</excludes>
 		</fileSet>
@@ -154,7 +155,9 @@ under the License.
 			</includes>
 			<excludes>
 				<exclude>flink-scala-examples-${project.version}.jar</exclude>
+				<exclude>original-flink-scala-examples-${project.version}.jar</exclude>
 				<exclude>flink-scala-examples-${project.version}-sources.jar</exclude>
+				<exclude>flink-scala-examples-${project.version}-javadoc.jar</exclude>
 			</excludes>
 		</fileSet>
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-dist/src/main/assemblies/yarn-uberjar.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/yarn-uberjar.xml b/flink-dist/src/main/assemblies/yarn-uberjar.xml
index 0bb1ea2..54be9de 100644
--- a/flink-dist/src/main/assemblies/yarn-uberjar.xml
+++ b/flink-dist/src/main/assemblies/yarn-uberjar.xml
@@ -39,6 +39,7 @@ under the License.
 			<excludes>
 				<exclude>org.apache.flink:flink-java-examples:*</exclude>
 				<exclude>org.apache.flink:flink-scala-examples:*</exclude>
+				<exclude>org.apache.flink:flink-streaming-examples:*</exclude>
 			</excludes>
 		</dependencySet>
 	</dependencySets>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-dist/src/main/assemblies/yarn.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/yarn.xml b/flink-dist/src/main/assemblies/yarn.xml
index 1d062fe..6cb7d3d 100644
--- a/flink-dist/src/main/assemblies/yarn.xml
+++ b/flink-dist/src/main/assemblies/yarn.xml
@@ -87,6 +87,7 @@ under the License.
 			</includes>
 			<excludes>
 				<exclude>flink-java-examples-${project.version}.jar</exclude>
+				<exclude>original-flink-java-examples-${project.version}.jar</exclude>
 				<exclude>flink-java-examples-${project.version}-sources.jar</exclude>
 			</excludes>
 		</fileSet>
@@ -101,6 +102,7 @@ under the License.
 			</includes>
 			<excludes>
 				<exclude>flink-scala-examples-${project.version}.jar</exclude>
+				<exclude>original-flink-scala-examples-${project.version}.jar</exclude>
 				<exclude>flink-scala-examples-${project.version}-sources.jar</exclude>
 			</excludes>
 		</fileSet>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java/pom.xml b/flink-java/pom.xml
index 5165a78..4b85537 100644
--- a/flink-java/pom.xml
+++ b/flink-java/pom.xml
@@ -40,39 +40,50 @@ under the License.
 			<artifactId>flink-core</artifactId>
 			<version>${project.version}</version>
 		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
-			<scope>test</scope>
-		</dependency>
+		
 		<dependency>
 			<groupId>org.apache.avro</groupId>
 			<artifactId>avro</artifactId>
 			<!-- version is derived from base module -->
 		</dependency>
+		
 		<dependency>
 			<groupId>com.esotericsoftware.kryo</groupId>
 			<artifactId>kryo</artifactId>
 			<version>2.24.0</version>
 		</dependency>
+		
+		<!--  guava needs to be in "provided" scope, so to not re-export the dependency -->
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
 	</dependencies>
 
-    <!-- Because flink-scala uses it in tests -->
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-jar-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <goals>
-                            <goal>test-jar</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
+	<!-- Because flink-scala uses it in tests -->
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index b84a4a0..ddaf452 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -16,7 +16,6 @@ software distributed under the License is distributed on an
 KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
-
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
@@ -45,7 +44,14 @@ under the License.
 		<dependency>
 			<groupId>commons-cli</groupId>
 			<artifactId>commons-cli</artifactId>
-			<version>1.2</version>
+		</dependency>
+		
+		<!--  guava needs to be in "provided" scope, so to not re-export the dependency -->
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
+			<scope>provided</scope>
 		</dependency>
 		
 		<dependency>
@@ -186,7 +192,7 @@ under the License.
 						</exclusion>
 						<exclusion>
 							<groupId>tomcat</groupId>
- 							<artifactId>jasper-compiler</artifactId>
+							<artifactId>jasper-compiler</artifactId>
 						</exclusion>
 						<exclusion>
 							<groupId>tomcat</groupId>
@@ -225,7 +231,6 @@ under the License.
 				</property>
 			</activation>
 			<dependencies>
-				<!-- YARN -->
 				<dependency>
 					<groupId>org.apache.hadoop</groupId>
 					<artifactId>hadoop-common</artifactId>
@@ -234,6 +239,38 @@ under the License.
 							<groupId>asm</groupId>
 							<artifactId>asm</artifactId>
 						</exclusion>
+						<exclusion>
+							<groupId>tomcat</groupId>
+								<artifactId>jasper-compiler</artifactId>
+						</exclusion>
+						<exclusion>
+							<groupId>tomcat</groupId>
+							<artifactId>jasper-runtime</artifactId>
+						</exclusion>
+						<exclusion>
+							<groupId>org.mortbay.jetty</groupId>
+							<artifactId>jetty</artifactId>
+						</exclusion>
+						<exclusion>
+							<groupId>org.mortbay.jetty</groupId>
+							<artifactId>jsp-api-2.1</artifactId>
+						</exclusion>
+						<exclusion>
+							<groupId>org.mortbay.jetty</groupId>
+							<artifactId>jsp-2.1</artifactId>
+						</exclusion>
+						<exclusion>
+							<groupId>org.mortbay.jetty</groupId>
+							<artifactId>jetty-util</artifactId>
+						</exclusion>
+						<exclusion>
+							<groupId>org.eclipse.jdt</groupId>
+							<artifactId>core</artifactId>
+						</exclusion>
+						<exclusion>
+							<groupId>javax.servlet</groupId>
+							<artifactId>servlet-api</artifactId>
+						</exclusion>
 					</exclusions>
 				</dependency>
 				<dependency>
@@ -244,6 +281,38 @@ under the License.
 							<groupId>asm</groupId>
 							<artifactId>asm</artifactId>
 						</exclusion>
+						<exclusion>
+							<groupId>tomcat</groupId>
+								<artifactId>jasper-compiler</artifactId>
+						</exclusion>
+						<exclusion>
+							<groupId>tomcat</groupId>
+							<artifactId>jasper-runtime</artifactId>
+						</exclusion>
+						<exclusion>
+							<groupId>org.mortbay.jetty</groupId>
+							<artifactId>jetty</artifactId>
+						</exclusion>
+						<exclusion>
+							<groupId>org.mortbay.jetty</groupId>
+							<artifactId>jsp-api-2.1</artifactId>
+						</exclusion>
+						<exclusion>
+							<groupId>org.mortbay.jetty</groupId>
+							<artifactId>jsp-2.1</artifactId>
+						</exclusion>
+						<exclusion>
+							<groupId>org.mortbay.jetty</groupId>
+							<artifactId>jetty-util</artifactId>
+						</exclusion>
+						<exclusion>
+							<groupId>org.eclipse.jdt</groupId>
+							<artifactId>core</artifactId>
+						</exclusion>
+						<exclusion>
+							<groupId>javax.servlet</groupId>
+							<artifactId>servlet-api</artifactId>
+						</exclusion>
 					</exclusions>
 				</dependency>
 			</dependencies>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml
index e0b9aeb..87bd3fb 100644
--- a/flink-scala/pom.xml
+++ b/flink-scala/pom.xml
@@ -90,6 +90,14 @@ under the License.
 			<artifactId>asm</artifactId>
 			<version>4.0</version>
 		</dependency>
+		
+		<!--  guava needs to be in "provided" scope, so to not re-export the dependency -->
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
+			<scope>provided</scope>
+		</dependency>
 
 		<dependency>
 			<groupId>org.scalatest</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-shaded/pom.xml
----------------------------------------------------------------------
diff --git a/flink-shaded/pom.xml b/flink-shaded/pom.xml
index 4d117ac..50f9749 100644
--- a/flink-shaded/pom.xml
+++ b/flink-shaded/pom.xml
@@ -38,6 +38,7 @@ under the License.
 		<dependency>
 			<groupId>com.google.guava</groupId>
 			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
 			<scope>compile</scope>
 		</dependency>
 	</dependencies>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-test-utils/pom.xml
----------------------------------------------------------------------
diff --git a/flink-test-utils/pom.xml b/flink-test-utils/pom.xml
index 76a5b7d..acf6e85 100644
--- a/flink-test-utils/pom.xml
+++ b/flink-test-utils/pom.xml
@@ -65,5 +65,13 @@ under the License.
 			<artifactId>junit</artifactId>
 			<version>4.11</version>
 		</dependency>
+		
+		<!--  guava needs to be in "provided" scope, so to not re-export the dependency -->
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
+			<scope>provided</scope>
+		</dependency>
 	</dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index fb36226..f994924 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -91,6 +91,14 @@ under the License.
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
+		
+		<!--  guava needs to be in "provided" scope, so to not re-export the dependency -->
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
+			<scope>provided</scope>
+		</dependency>
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/719e2889/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3c0b9b7..081ccc0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -78,6 +78,7 @@ under the License.
 		<flink.forkCount>1.5C</flink.forkCount>
 		<flink.reuseForks>true</flink.reuseForks>
 		<slf4j.version>1.7.7</slf4j.version>
+		<guava.version>17.0</guava.version>
 	</properties>
 
 	<dependencies>
@@ -104,13 +105,6 @@ under the License.
 			<artifactId>log4j</artifactId>
 			<version>1.2.17</version>
 		</dependency>
-		
-		<!--  global provided guava dependency, prevents guava from being packaged. we use a shaded variant instead -->
-		<dependency>
-			<groupId>com.google.guava</groupId>
-			<artifactId>guava</artifactId>
-			<scope>provided</scope>
-		</dependency>
 
 		<dependency>
 			<groupId>junit</groupId>
@@ -156,15 +150,7 @@ under the License.
 	<!-- this section defines the module versions that are used if nothing else is specified. -->
 	<dependencyManagement>
 		<dependencies>
-			
-			<!-- Make sure we use a consistent guava version throughout the project -->
-			<dependency>
-				<groupId>com.google.guava</groupId>
-				<artifactId>guava</artifactId>
-				<version>17.0</version>
-				<scope>compile</scope>
-			</dependency>
-		
+
 			<!-- Make sure we use a consistent jetty version throughout the project -->
 			<dependency>
 				<groupId>org.eclipse.jetty</groupId>
@@ -181,6 +167,11 @@ under the License.
 				<artifactId>jetty-servlet</artifactId>
 				<version>8.0.0.M1</version>
 			</dependency>
+			<dependency>
+				<groupId>org.eclipse.jetty</groupId>
+				<artifactId>jetty-util</artifactId>
+				<version>8.0.0.M1</version>
+			</dependency>
 			
 			<!-- Make sure we use a consistent avro version throughout the project -->
 			<dependency>
@@ -190,10 +181,17 @@ under the License.
 			</dependency>
 			<dependency>
 				<groupId>org.apache.avro</groupId>
-				<artifactId>avro-mapred</artifactId>
+				<artifactId>avro-ipc</artifactId>
 				<version>1.7.6</version>
 			</dependency>
 			
+			<!-- Make sure we use a consistent commons-cli version throughout the project -->
+			<dependency>
+				<groupId>commons-cli</groupId>
+				<artifactId>commons-cli</artifactId>
+				<version>1.2</version>
+			</dependency>
+			
 			<!-- Managed dependency required for HBase in flink-hbase  -->
 			<dependency>
 				<groupId>org.javassist</groupId>