You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/05/31 07:18:28 UTC

[1/3] beam git commit: Make SerializableConfiguration more robust by using Hadoop based serialization

Repository: beam
Updated Branches:
  refs/heads/master 2fa24d89c -> 006fde46c


Make SerializableConfiguration more robust by using Hadoop based serialization


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/185deeba
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/185deeba
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/185deeba

Branch: refs/heads/master
Commit: 185deebaa52bbf34592a21d86f316b4204fa09ba
Parents: 636eaff
Author: Ismaël Mejía <ie...@apache.org>
Authored: Sun May 28 11:38:08 2017 +0200
Committer: Ismaël Mejía <ie...@apache.org>
Committed: Wed May 31 09:17:00 2017 +0200

----------------------------------------------------------------------
 .../sdk/io/hadoop/SerializableConfiguration.java  | 18 +++++++++---------
 1 file changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/185deeba/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/SerializableConfiguration.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/SerializableConfiguration.java b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/SerializableConfiguration.java
index 8101f4b..33c660a 100644
--- a/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/SerializableConfiguration.java
+++ b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/SerializableConfiguration.java
@@ -49,21 +49,21 @@ public class SerializableConfiguration implements Externalizable {
     return conf;
   }
 
+
   @Override
   public void writeExternal(ObjectOutput out) throws IOException {
-    out.writeInt(conf.size());
-    for (Map.Entry<String, String> entry : conf) {
-      out.writeUTF(entry.getKey());
-      out.writeUTF(entry.getValue());
-    }
+    out.writeUTF(conf.getClass().getCanonicalName());
+    conf.write(out);
   }
 
   @Override
   public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-    conf = new Configuration(false);
-    int size = in.readInt();
-    for (int i = 0; i < size; i++) {
-      conf.set(in.readUTF(), in.readUTF());
+    String className = in.readUTF();
+    try {
+      conf = (Configuration) Class.forName(className).newInstance();
+      conf.readFields(in);
+    } catch (InstantiationException | IllegalAccessException e) {
+      throw new IOException("Unable to create configuration: " + e);
     }
   }
 


[3/3] beam git commit: This closes #2812

Posted by ie...@apache.org.
This closes #2812


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/006fde46
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/006fde46
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/006fde46

Branch: refs/heads/master
Commit: 006fde46ccef4c57d04b884dd8ed39c5c8e5a1f9
Parents: 2fa24d8 185deeb
Author: Ismaël Mejía <ie...@apache.org>
Authored: Wed May 31 09:17:56 2017 +0200
Committer: Ismaël Mejía <ie...@apache.org>
Committed: Wed May 31 09:17:56 2017 +0200

----------------------------------------------------------------------
 .../io/hadoop/SerializableConfiguration.java    | 18 ++---
 .../hadoop/inputformat/HadoopInputFormatIO.java | 53 ++-----------
 .../inputformat/HadoopInputFormatIOTest.java    | 80 ++++++++++----------
 3 files changed, 56 insertions(+), 95 deletions(-)
----------------------------------------------------------------------



[2/3] beam git commit: Refactor HadoopInputFormatIO to use SerializableConfiguration from hadoop-common

Posted by ie...@apache.org.
Refactor HadoopInputFormatIO to use SerializableConfiguration from hadoop-common


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/636eaff0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/636eaff0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/636eaff0

Branch: refs/heads/master
Commit: 636eaff03646113daf868949734199f5697bdf0d
Parents: 2fa24d8
Author: Ismaël Mejía <ie...@apache.org>
Authored: Tue May 2 01:33:27 2017 +0200
Committer: Ismaël Mejía <ie...@apache.org>
Committed: Wed May 31 09:17:00 2017 +0200

----------------------------------------------------------------------
 .../hadoop/inputformat/HadoopInputFormatIO.java | 53 ++-----------
 .../inputformat/HadoopInputFormatIOTest.java    | 80 ++++++++++----------
 2 files changed, 47 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/636eaff0/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
index 336740c..efd47fd 100644
--- a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
+++ b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
@@ -23,11 +23,8 @@ import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.AtomicDouble;
-import java.io.Externalizable;
 import java.io.IOException;
-import java.io.ObjectInput;
 import java.io.ObjectInputStream;
-import java.io.ObjectOutput;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.math.BigDecimal;
@@ -46,6 +43,7 @@ import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
 import org.apache.beam.sdk.io.hadoop.WritableCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -432,7 +430,7 @@ public class HadoopInputFormatIO {
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
-      Configuration hadoopConfig = getConfiguration().getHadoopConfiguration();
+      Configuration hadoopConfig = getConfiguration().get();
       if (hadoopConfig != null) {
         builder.addIfNotNull(DisplayData.item("mapreduce.job.inputformat.class",
             hadoopConfig.get("mapreduce.job.inputformat.class"))
@@ -493,7 +491,7 @@ public class HadoopInputFormatIO {
       }
       createInputFormatInstance();
       List<InputSplit> splits =
-          inputFormatObj.getSplits(Job.getInstance(conf.getHadoopConfiguration()));
+          inputFormatObj.getSplits(Job.getInstance(conf.get()));
       if (splits == null) {
         throw new IOException("Error in computing splits, getSplits() returns null.");
       }
@@ -520,12 +518,12 @@ public class HadoopInputFormatIO {
       if (inputFormatObj == null) {
         try {
           taskAttemptContext =
-              new TaskAttemptContextImpl(conf.getHadoopConfiguration(), new TaskAttemptID());
+              new TaskAttemptContextImpl(conf.get(), new TaskAttemptID());
           inputFormatObj =
               (InputFormat<?, ?>) conf
-                  .getHadoopConfiguration()
+                  .get()
                   .getClassByName(
-                      conf.getHadoopConfiguration().get("mapreduce.job.inputformat.class"))
+                      conf.get().get("mapreduce.job.inputformat.class"))
                   .newInstance();
           /*
            * If InputFormat explicitly implements interface {@link Configurable}, then setConf()
@@ -535,7 +533,7 @@ public class HadoopInputFormatIO {
            * org.apache.hadoop.hbase.mapreduce.TableInputFormat TableInputFormat}, etc.
            */
           if (Configurable.class.isAssignableFrom(inputFormatObj.getClass())) {
-            ((Configurable) inputFormatObj).setConf(conf.getHadoopConfiguration());
+            ((Configurable) inputFormatObj).setConf(conf.get());
           }
         } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
           throw new IOException("Unable to create InputFormat object: ", e);
@@ -802,41 +800,4 @@ public class HadoopInputFormatIO {
       new ObjectWritable(inputSplit).write(out);
     }
   }
-
-  /**
-   * A wrapper to allow Hadoop {@link org.apache.hadoop.conf.Configuration} to be serialized using
-   * Java's standard serialization mechanisms. Note that the org.apache.hadoop.conf.Configuration
-   * is Writable.
-   */
-  public static class SerializableConfiguration implements Externalizable {
-
-    private Configuration conf;
-
-    public SerializableConfiguration() {}
-
-    public SerializableConfiguration(Configuration conf) {
-      this.conf = conf;
-    }
-
-    public Configuration getHadoopConfiguration() {
-      return conf;
-    }
-
-    @Override
-    public void writeExternal(ObjectOutput out) throws IOException {
-      out.writeUTF(conf.getClass().getCanonicalName());
-      ((Writable) conf).write(out);
-    }
-
-    @Override
-    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-      String className = in.readUTF();
-      try {
-        conf = (Configuration) Class.forName(className).newInstance();
-        conf.readFields(in);
-      } catch (InstantiationException | IllegalAccessException e) {
-        throw new IOException("Unable to create configuration: " + e);
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/636eaff0/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
index aeeeb17..9ec3838 100644
--- a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
+++ b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
@@ -26,11 +26,11 @@ import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
 import org.apache.beam.sdk.io.hadoop.WritableCoder;
 import org.apache.beam.sdk.io.hadoop.inputformat.EmployeeInputFormat.EmployeeRecordReader;
 import org.apache.beam.sdk.io.hadoop.inputformat.EmployeeInputFormat.NewObjectsEmployeeInputSplit;
 import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO.HadoopInputFormatBoundedSource;
-import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO.SerializableConfiguration;
 import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO.SerializableSplit;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.testing.PAssert;
@@ -94,11 +94,11 @@ public class HadoopInputFormatIOTest {
   @Test
   public void testReadBuildsCorrectly() {
     HadoopInputFormatIO.Read<String, String> read = HadoopInputFormatIO.<String, String>read()
-        .withConfiguration(serConf.getHadoopConfiguration())
+        .withConfiguration(serConf.get())
         .withKeyTranslation(myKeyTranslate)
         .withValueTranslation(myValueTranslate);
-    assertEquals(serConf.getHadoopConfiguration(),
-        read.getConfiguration().getHadoopConfiguration());
+    assertEquals(serConf.get(),
+        read.getConfiguration().get());
     assertEquals(myKeyTranslate, read.getKeyTranslationFunction());
     assertEquals(myValueTranslate, read.getValueTranslationFunction());
     assertEquals(myValueTranslate.getOutputTypeDescriptor(), read.getValueTypeDescriptor());
@@ -116,10 +116,10 @@ public class HadoopInputFormatIOTest {
     HadoopInputFormatIO.Read<String, String> read =
         HadoopInputFormatIO.<String, String>read()
             .withValueTranslation(myValueTranslate)
-            .withConfiguration(serConf.getHadoopConfiguration())
+            .withConfiguration(serConf.get())
             .withKeyTranslation(myKeyTranslate);
-    assertEquals(serConf.getHadoopConfiguration(),
-        read.getConfiguration().getHadoopConfiguration());
+    assertEquals(serConf.get(),
+        read.getConfiguration().get());
     assertEquals(myKeyTranslate, read.getKeyTranslationFunction());
     assertEquals(myValueTranslate, read.getValueTranslationFunction());
     assertEquals(myKeyTranslate.getOutputTypeDescriptor(), read.getKeyTypeDescriptor());
@@ -142,15 +142,15 @@ public class HadoopInputFormatIOTest {
             Employee.class,
             Text.class);
     HadoopInputFormatIO.Read<String, String> read = HadoopInputFormatIO.<String, String>read()
-        .withConfiguration(serConf.getHadoopConfiguration())
+        .withConfiguration(serConf.get())
         .withKeyTranslation(myKeyTranslate)
-        .withConfiguration(diffConf.getHadoopConfiguration());
-    assertEquals(diffConf.getHadoopConfiguration(),
-        read.getConfiguration().getHadoopConfiguration());
+        .withConfiguration(diffConf.get());
+    assertEquals(diffConf.get(),
+        read.getConfiguration().get());
     assertEquals(myKeyTranslate, read.getKeyTranslationFunction());
     assertEquals(null, read.getValueTranslationFunction());
     assertEquals(myKeyTranslate.getOutputTypeDescriptor(), read.getKeyTypeDescriptor());
-    assertEquals(diffConf.getHadoopConfiguration().getClass("value.class", Object.class), read
+    assertEquals(diffConf.get().getClass("value.class", Object.class), read
         .getValueTypeDescriptor().getRawType());
   }
 
@@ -173,14 +173,14 @@ public class HadoopInputFormatIOTest {
   @Test
   public void testReadObjectCreationWithConfiguration() {
     HadoopInputFormatIO.Read<Text, Employee> read = HadoopInputFormatIO.<Text, Employee>read()
-        .withConfiguration(serConf.getHadoopConfiguration());
-    assertEquals(serConf.getHadoopConfiguration(),
-        read.getConfiguration().getHadoopConfiguration());
+        .withConfiguration(serConf.get());
+    assertEquals(serConf.get(),
+        read.getConfiguration().get());
     assertEquals(null, read.getKeyTranslationFunction());
     assertEquals(null, read.getValueTranslationFunction());
-    assertEquals(serConf.getHadoopConfiguration().getClass("key.class", Object.class), read
+    assertEquals(serConf.get().getClass("key.class", Object.class), read
         .getKeyTypeDescriptor().getRawType());
-    assertEquals(serConf.getHadoopConfiguration().getClass("value.class", Object.class), read
+    assertEquals(serConf.get().getClass("value.class", Object.class), read
         .getValueTypeDescriptor().getRawType());
   }
 
@@ -194,7 +194,7 @@ public class HadoopInputFormatIOTest {
   public void testReadObjectCreationFailsIfKeyTranslationFunctionIsNull() {
     thrown.expect(NullPointerException.class);
     HadoopInputFormatIO.<String, Employee>read()
-        .withConfiguration(serConf.getHadoopConfiguration())
+        .withConfiguration(serConf.get())
         .withKeyTranslation(null);
   }
 
@@ -205,15 +205,15 @@ public class HadoopInputFormatIOTest {
   @Test
   public void testReadObjectCreationWithConfigurationKeyTranslation() {
     HadoopInputFormatIO.Read<String, Employee> read = HadoopInputFormatIO.<String, Employee>read()
-        .withConfiguration(serConf.getHadoopConfiguration())
+        .withConfiguration(serConf.get())
         .withKeyTranslation(myKeyTranslate);
-    assertEquals(serConf.getHadoopConfiguration(),
-        read.getConfiguration().getHadoopConfiguration());
+    assertEquals(serConf.get(),
+        read.getConfiguration().get());
     assertEquals(myKeyTranslate, read.getKeyTranslationFunction());
     assertEquals(null, read.getValueTranslationFunction());
     assertEquals(myKeyTranslate.getOutputTypeDescriptor().getRawType(),
         read.getKeyTypeDescriptor().getRawType());
-    assertEquals(serConf.getHadoopConfiguration().getClass("value.class", Object.class),
+    assertEquals(serConf.get().getClass("value.class", Object.class),
         read.getValueTypeDescriptor().getRawType());
   }
 
@@ -227,7 +227,7 @@ public class HadoopInputFormatIOTest {
   public void testReadObjectCreationFailsIfValueTranslationFunctionIsNull() {
     thrown.expect(NullPointerException.class);
     HadoopInputFormatIO.<Text, String>read()
-        .withConfiguration(serConf.getHadoopConfiguration())
+        .withConfiguration(serConf.get())
         .withValueTranslation(null);
   }
 
@@ -238,13 +238,13 @@ public class HadoopInputFormatIOTest {
   @Test
   public void testReadObjectCreationWithConfigurationValueTranslation() {
     HadoopInputFormatIO.Read<Text, String> read = HadoopInputFormatIO.<Text, String>read()
-        .withConfiguration(serConf.getHadoopConfiguration())
+        .withConfiguration(serConf.get())
         .withValueTranslation(myValueTranslate);
-    assertEquals(serConf.getHadoopConfiguration(),
-        read.getConfiguration().getHadoopConfiguration());
+    assertEquals(serConf.get(),
+        read.getConfiguration().get());
     assertEquals(null, read.getKeyTranslationFunction());
     assertEquals(myValueTranslate, read.getValueTranslationFunction());
-    assertEquals(serConf.getHadoopConfiguration().getClass("key.class", Object.class),
+    assertEquals(serConf.get().getClass("key.class", Object.class),
         read.getKeyTypeDescriptor().getRawType());
     assertEquals(myValueTranslate.getOutputTypeDescriptor().getRawType(),
         read.getValueTypeDescriptor().getRawType());
@@ -257,11 +257,11 @@ public class HadoopInputFormatIOTest {
   @Test
   public void testReadObjectCreationWithConfigurationKeyTranslationValueTranslation() {
     HadoopInputFormatIO.Read<String, String> read = HadoopInputFormatIO.<String, String>read()
-        .withConfiguration(serConf.getHadoopConfiguration())
+        .withConfiguration(serConf.get())
         .withKeyTranslation(myKeyTranslate)
         .withValueTranslation(myValueTranslate);
-    assertEquals(serConf.getHadoopConfiguration(),
-        read.getConfiguration().getHadoopConfiguration());
+    assertEquals(serConf.get(),
+        read.getConfiguration().get());
     assertEquals(myKeyTranslate, read.getKeyTranslationFunction());
     assertEquals(myValueTranslate, read.getValueTranslationFunction());
     assertEquals(myKeyTranslate.getOutputTypeDescriptor().getRawType(),
@@ -342,13 +342,13 @@ public class HadoopInputFormatIOTest {
           }
         };
     HadoopInputFormatIO.Read<String, Employee> read = HadoopInputFormatIO.<String, Employee>read()
-        .withConfiguration(serConf.getHadoopConfiguration())
+        .withConfiguration(serConf.get())
         .withKeyTranslation(myKeyTranslateWithWrongInputType);
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage(String.format(
         "Key translation's input type is not same as hadoop InputFormat : %s key " + "class : %s",
-        serConf.getHadoopConfiguration().getClass("mapreduce.job.inputformat.class",
-            InputFormat.class), serConf.getHadoopConfiguration()
+        serConf.get().getClass("mapreduce.job.inputformat.class",
+            InputFormat.class), serConf.get()
             .getClass("key.class", Object.class)));
     read.validateTransform();
   }
@@ -370,15 +370,15 @@ public class HadoopInputFormatIOTest {
         };
     HadoopInputFormatIO.Read<Text, String> read =
         HadoopInputFormatIO.<Text, String>read()
-            .withConfiguration(serConf.getHadoopConfiguration())
+            .withConfiguration(serConf.get())
             .withValueTranslation(myValueTranslateWithWrongInputType);
     String expectedMessage =
         String.format(
             "Value translation's input type is not same as hadoop InputFormat :  "
                 + "%s value class : %s",
-            serConf.getHadoopConfiguration().getClass("mapreduce.job.inputformat.class",
+            serConf.get().getClass("mapreduce.job.inputformat.class",
                 InputFormat.class),
-            serConf.getHadoopConfiguration().getClass("value.class", Object.class));
+            serConf.get().getClass("value.class", Object.class));
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage(expectedMessage);
     read.validateTransform();
@@ -387,7 +387,7 @@ public class HadoopInputFormatIOTest {
   @Test
   public void testReadingData() throws Exception {
     HadoopInputFormatIO.Read<Text, Employee> read = HadoopInputFormatIO.<Text, Employee>read()
-        .withConfiguration(serConf.getHadoopConfiguration());
+        .withConfiguration(serConf.get());
     List<KV<Text, Employee>> expected = TestEmployeeDataSet.getEmployeeData();
     PCollection<KV<Text, Employee>> actual = p.apply("ReadTest", read);
     PAssert.that(actual).containsInAnyOrder(expected);
@@ -413,11 +413,11 @@ public class HadoopInputFormatIOTest {
     assertThat(
         displayData,
         hasDisplayItem("mapreduce.job.inputformat.class",
-            serConf.getHadoopConfiguration().get("mapreduce.job.inputformat.class")));
+            serConf.get().get("mapreduce.job.inputformat.class")));
     assertThat(displayData,
-        hasDisplayItem("key.class", serConf.getHadoopConfiguration().get("key.class")));
+        hasDisplayItem("key.class", serConf.get().get("key.class")));
     assertThat(displayData,
-        hasDisplayItem("value.class", serConf.getHadoopConfiguration().get("value.class")));
+        hasDisplayItem("value.class", serConf.get().get("value.class")));
   }
 
   /**