You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/05/31 07:18:28 UTC
[1/3] beam git commit: Make SerializableConfiguration more robust by
using Hadoop based serialization
Repository: beam
Updated Branches:
refs/heads/master 2fa24d89c -> 006fde46c
Make SerializableConfiguration more robust by using Hadoop based serialization
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/185deeba
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/185deeba
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/185deeba
Branch: refs/heads/master
Commit: 185deebaa52bbf34592a21d86f316b4204fa09ba
Parents: 636eaff
Author: Ismaël Mejía <ie...@apache.org>
Authored: Sun May 28 11:38:08 2017 +0200
Committer: Ismaël Mejía <ie...@apache.org>
Committed: Wed May 31 09:17:00 2017 +0200
----------------------------------------------------------------------
.../sdk/io/hadoop/SerializableConfiguration.java | 18 +++++++++---------
1 file changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/185deeba/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/SerializableConfiguration.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/SerializableConfiguration.java b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/SerializableConfiguration.java
index 8101f4b..33c660a 100644
--- a/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/SerializableConfiguration.java
+++ b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/SerializableConfiguration.java
@@ -49,21 +49,21 @@ public class SerializableConfiguration implements Externalizable {
return conf;
}
+
@Override
public void writeExternal(ObjectOutput out) throws IOException {
- out.writeInt(conf.size());
- for (Map.Entry<String, String> entry : conf) {
- out.writeUTF(entry.getKey());
- out.writeUTF(entry.getValue());
- }
+ out.writeUTF(conf.getClass().getCanonicalName());
+ conf.write(out);
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- conf = new Configuration(false);
- int size = in.readInt();
- for (int i = 0; i < size; i++) {
- conf.set(in.readUTF(), in.readUTF());
+ String className = in.readUTF();
+ try {
+ conf = (Configuration) Class.forName(className).newInstance();
+ conf.readFields(in);
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new IOException("Unable to create configuration: " + e);
}
}
[3/3] beam git commit: This closes #2812
Posted by ie...@apache.org.
This closes #2812
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/006fde46
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/006fde46
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/006fde46
Branch: refs/heads/master
Commit: 006fde46ccef4c57d04b884dd8ed39c5c8e5a1f9
Parents: 2fa24d8 185deeb
Author: Ismaël Mejía <ie...@apache.org>
Authored: Wed May 31 09:17:56 2017 +0200
Committer: Ismaël Mejía <ie...@apache.org>
Committed: Wed May 31 09:17:56 2017 +0200
----------------------------------------------------------------------
.../io/hadoop/SerializableConfiguration.java | 18 ++---
.../hadoop/inputformat/HadoopInputFormatIO.java | 53 ++-----------
.../inputformat/HadoopInputFormatIOTest.java | 80 ++++++++++----------
3 files changed, 56 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
[2/3] beam git commit: Refactor HadoopInputFormatIO to use
SerializableConfiguration from hadoop-common
Posted by ie...@apache.org.
Refactor HadoopInputFormatIO to use SerializableConfiguration from hadoop-common
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/636eaff0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/636eaff0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/636eaff0
Branch: refs/heads/master
Commit: 636eaff03646113daf868949734199f5697bdf0d
Parents: 2fa24d8
Author: Ismaël Mejía <ie...@apache.org>
Authored: Tue May 2 01:33:27 2017 +0200
Committer: Ismaël Mejía <ie...@apache.org>
Committed: Wed May 31 09:17:00 2017 +0200
----------------------------------------------------------------------
.../hadoop/inputformat/HadoopInputFormatIO.java | 53 ++-----------
.../inputformat/HadoopInputFormatIOTest.java | 80 ++++++++++----------
2 files changed, 47 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/636eaff0/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
index 336740c..efd47fd 100644
--- a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
+++ b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
@@ -23,11 +23,8 @@ import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AtomicDouble;
-import java.io.Externalizable;
import java.io.IOException;
-import java.io.ObjectInput;
import java.io.ObjectInputStream;
-import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.math.BigDecimal;
@@ -46,6 +43,7 @@ import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
import org.apache.beam.sdk.io.hadoop.WritableCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.PTransform;
@@ -432,7 +430,7 @@ public class HadoopInputFormatIO {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- Configuration hadoopConfig = getConfiguration().getHadoopConfiguration();
+ Configuration hadoopConfig = getConfiguration().get();
if (hadoopConfig != null) {
builder.addIfNotNull(DisplayData.item("mapreduce.job.inputformat.class",
hadoopConfig.get("mapreduce.job.inputformat.class"))
@@ -493,7 +491,7 @@ public class HadoopInputFormatIO {
}
createInputFormatInstance();
List<InputSplit> splits =
- inputFormatObj.getSplits(Job.getInstance(conf.getHadoopConfiguration()));
+ inputFormatObj.getSplits(Job.getInstance(conf.get()));
if (splits == null) {
throw new IOException("Error in computing splits, getSplits() returns null.");
}
@@ -520,12 +518,12 @@ public class HadoopInputFormatIO {
if (inputFormatObj == null) {
try {
taskAttemptContext =
- new TaskAttemptContextImpl(conf.getHadoopConfiguration(), new TaskAttemptID());
+ new TaskAttemptContextImpl(conf.get(), new TaskAttemptID());
inputFormatObj =
(InputFormat<?, ?>) conf
- .getHadoopConfiguration()
+ .get()
.getClassByName(
- conf.getHadoopConfiguration().get("mapreduce.job.inputformat.class"))
+ conf.get().get("mapreduce.job.inputformat.class"))
.newInstance();
/*
* If InputFormat explicitly implements interface {@link Configurable}, then setConf()
@@ -535,7 +533,7 @@ public class HadoopInputFormatIO {
* org.apache.hadoop.hbase.mapreduce.TableInputFormat TableInputFormat}, etc.
*/
if (Configurable.class.isAssignableFrom(inputFormatObj.getClass())) {
- ((Configurable) inputFormatObj).setConf(conf.getHadoopConfiguration());
+ ((Configurable) inputFormatObj).setConf(conf.get());
}
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
throw new IOException("Unable to create InputFormat object: ", e);
@@ -802,41 +800,4 @@ public class HadoopInputFormatIO {
new ObjectWritable(inputSplit).write(out);
}
}
-
- /**
- * A wrapper to allow Hadoop {@link org.apache.hadoop.conf.Configuration} to be serialized using
- * Java's standard serialization mechanisms. Note that the org.apache.hadoop.conf.Configuration
- * is Writable.
- */
- public static class SerializableConfiguration implements Externalizable {
-
- private Configuration conf;
-
- public SerializableConfiguration() {}
-
- public SerializableConfiguration(Configuration conf) {
- this.conf = conf;
- }
-
- public Configuration getHadoopConfiguration() {
- return conf;
- }
-
- @Override
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeUTF(conf.getClass().getCanonicalName());
- ((Writable) conf).write(out);
- }
-
- @Override
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- String className = in.readUTF();
- try {
- conf = (Configuration) Class.forName(className).newInstance();
- conf.readFields(in);
- } catch (InstantiationException | IllegalAccessException e) {
- throw new IOException("Unable to create configuration: " + e);
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/636eaff0/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
index aeeeb17..9ec3838 100644
--- a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
+++ b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
@@ -26,11 +26,11 @@ import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
import org.apache.beam.sdk.io.hadoop.WritableCoder;
import org.apache.beam.sdk.io.hadoop.inputformat.EmployeeInputFormat.EmployeeRecordReader;
import org.apache.beam.sdk.io.hadoop.inputformat.EmployeeInputFormat.NewObjectsEmployeeInputSplit;
import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO.HadoopInputFormatBoundedSource;
-import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO.SerializableConfiguration;
import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO.SerializableSplit;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.PAssert;
@@ -94,11 +94,11 @@ public class HadoopInputFormatIOTest {
@Test
public void testReadBuildsCorrectly() {
HadoopInputFormatIO.Read<String, String> read = HadoopInputFormatIO.<String, String>read()
- .withConfiguration(serConf.getHadoopConfiguration())
+ .withConfiguration(serConf.get())
.withKeyTranslation(myKeyTranslate)
.withValueTranslation(myValueTranslate);
- assertEquals(serConf.getHadoopConfiguration(),
- read.getConfiguration().getHadoopConfiguration());
+ assertEquals(serConf.get(),
+ read.getConfiguration().get());
assertEquals(myKeyTranslate, read.getKeyTranslationFunction());
assertEquals(myValueTranslate, read.getValueTranslationFunction());
assertEquals(myValueTranslate.getOutputTypeDescriptor(), read.getValueTypeDescriptor());
@@ -116,10 +116,10 @@ public class HadoopInputFormatIOTest {
HadoopInputFormatIO.Read<String, String> read =
HadoopInputFormatIO.<String, String>read()
.withValueTranslation(myValueTranslate)
- .withConfiguration(serConf.getHadoopConfiguration())
+ .withConfiguration(serConf.get())
.withKeyTranslation(myKeyTranslate);
- assertEquals(serConf.getHadoopConfiguration(),
- read.getConfiguration().getHadoopConfiguration());
+ assertEquals(serConf.get(),
+ read.getConfiguration().get());
assertEquals(myKeyTranslate, read.getKeyTranslationFunction());
assertEquals(myValueTranslate, read.getValueTranslationFunction());
assertEquals(myKeyTranslate.getOutputTypeDescriptor(), read.getKeyTypeDescriptor());
@@ -142,15 +142,15 @@ public class HadoopInputFormatIOTest {
Employee.class,
Text.class);
HadoopInputFormatIO.Read<String, String> read = HadoopInputFormatIO.<String, String>read()
- .withConfiguration(serConf.getHadoopConfiguration())
+ .withConfiguration(serConf.get())
.withKeyTranslation(myKeyTranslate)
- .withConfiguration(diffConf.getHadoopConfiguration());
- assertEquals(diffConf.getHadoopConfiguration(),
- read.getConfiguration().getHadoopConfiguration());
+ .withConfiguration(diffConf.get());
+ assertEquals(diffConf.get(),
+ read.getConfiguration().get());
assertEquals(myKeyTranslate, read.getKeyTranslationFunction());
assertEquals(null, read.getValueTranslationFunction());
assertEquals(myKeyTranslate.getOutputTypeDescriptor(), read.getKeyTypeDescriptor());
- assertEquals(diffConf.getHadoopConfiguration().getClass("value.class", Object.class), read
+ assertEquals(diffConf.get().getClass("value.class", Object.class), read
.getValueTypeDescriptor().getRawType());
}
@@ -173,14 +173,14 @@ public class HadoopInputFormatIOTest {
@Test
public void testReadObjectCreationWithConfiguration() {
HadoopInputFormatIO.Read<Text, Employee> read = HadoopInputFormatIO.<Text, Employee>read()
- .withConfiguration(serConf.getHadoopConfiguration());
- assertEquals(serConf.getHadoopConfiguration(),
- read.getConfiguration().getHadoopConfiguration());
+ .withConfiguration(serConf.get());
+ assertEquals(serConf.get(),
+ read.getConfiguration().get());
assertEquals(null, read.getKeyTranslationFunction());
assertEquals(null, read.getValueTranslationFunction());
- assertEquals(serConf.getHadoopConfiguration().getClass("key.class", Object.class), read
+ assertEquals(serConf.get().getClass("key.class", Object.class), read
.getKeyTypeDescriptor().getRawType());
- assertEquals(serConf.getHadoopConfiguration().getClass("value.class", Object.class), read
+ assertEquals(serConf.get().getClass("value.class", Object.class), read
.getValueTypeDescriptor().getRawType());
}
@@ -194,7 +194,7 @@ public class HadoopInputFormatIOTest {
public void testReadObjectCreationFailsIfKeyTranslationFunctionIsNull() {
thrown.expect(NullPointerException.class);
HadoopInputFormatIO.<String, Employee>read()
- .withConfiguration(serConf.getHadoopConfiguration())
+ .withConfiguration(serConf.get())
.withKeyTranslation(null);
}
@@ -205,15 +205,15 @@ public class HadoopInputFormatIOTest {
@Test
public void testReadObjectCreationWithConfigurationKeyTranslation() {
HadoopInputFormatIO.Read<String, Employee> read = HadoopInputFormatIO.<String, Employee>read()
- .withConfiguration(serConf.getHadoopConfiguration())
+ .withConfiguration(serConf.get())
.withKeyTranslation(myKeyTranslate);
- assertEquals(serConf.getHadoopConfiguration(),
- read.getConfiguration().getHadoopConfiguration());
+ assertEquals(serConf.get(),
+ read.getConfiguration().get());
assertEquals(myKeyTranslate, read.getKeyTranslationFunction());
assertEquals(null, read.getValueTranslationFunction());
assertEquals(myKeyTranslate.getOutputTypeDescriptor().getRawType(),
read.getKeyTypeDescriptor().getRawType());
- assertEquals(serConf.getHadoopConfiguration().getClass("value.class", Object.class),
+ assertEquals(serConf.get().getClass("value.class", Object.class),
read.getValueTypeDescriptor().getRawType());
}
@@ -227,7 +227,7 @@ public class HadoopInputFormatIOTest {
public void testReadObjectCreationFailsIfValueTranslationFunctionIsNull() {
thrown.expect(NullPointerException.class);
HadoopInputFormatIO.<Text, String>read()
- .withConfiguration(serConf.getHadoopConfiguration())
+ .withConfiguration(serConf.get())
.withValueTranslation(null);
}
@@ -238,13 +238,13 @@ public class HadoopInputFormatIOTest {
@Test
public void testReadObjectCreationWithConfigurationValueTranslation() {
HadoopInputFormatIO.Read<Text, String> read = HadoopInputFormatIO.<Text, String>read()
- .withConfiguration(serConf.getHadoopConfiguration())
+ .withConfiguration(serConf.get())
.withValueTranslation(myValueTranslate);
- assertEquals(serConf.getHadoopConfiguration(),
- read.getConfiguration().getHadoopConfiguration());
+ assertEquals(serConf.get(),
+ read.getConfiguration().get());
assertEquals(null, read.getKeyTranslationFunction());
assertEquals(myValueTranslate, read.getValueTranslationFunction());
- assertEquals(serConf.getHadoopConfiguration().getClass("key.class", Object.class),
+ assertEquals(serConf.get().getClass("key.class", Object.class),
read.getKeyTypeDescriptor().getRawType());
assertEquals(myValueTranslate.getOutputTypeDescriptor().getRawType(),
read.getValueTypeDescriptor().getRawType());
@@ -257,11 +257,11 @@ public class HadoopInputFormatIOTest {
@Test
public void testReadObjectCreationWithConfigurationKeyTranslationValueTranslation() {
HadoopInputFormatIO.Read<String, String> read = HadoopInputFormatIO.<String, String>read()
- .withConfiguration(serConf.getHadoopConfiguration())
+ .withConfiguration(serConf.get())
.withKeyTranslation(myKeyTranslate)
.withValueTranslation(myValueTranslate);
- assertEquals(serConf.getHadoopConfiguration(),
- read.getConfiguration().getHadoopConfiguration());
+ assertEquals(serConf.get(),
+ read.getConfiguration().get());
assertEquals(myKeyTranslate, read.getKeyTranslationFunction());
assertEquals(myValueTranslate, read.getValueTranslationFunction());
assertEquals(myKeyTranslate.getOutputTypeDescriptor().getRawType(),
@@ -342,13 +342,13 @@ public class HadoopInputFormatIOTest {
}
};
HadoopInputFormatIO.Read<String, Employee> read = HadoopInputFormatIO.<String, Employee>read()
- .withConfiguration(serConf.getHadoopConfiguration())
+ .withConfiguration(serConf.get())
.withKeyTranslation(myKeyTranslateWithWrongInputType);
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(String.format(
"Key translation's input type is not same as hadoop InputFormat : %s key " + "class : %s",
- serConf.getHadoopConfiguration().getClass("mapreduce.job.inputformat.class",
- InputFormat.class), serConf.getHadoopConfiguration()
+ serConf.get().getClass("mapreduce.job.inputformat.class",
+ InputFormat.class), serConf.get()
.getClass("key.class", Object.class)));
read.validateTransform();
}
@@ -370,15 +370,15 @@ public class HadoopInputFormatIOTest {
};
HadoopInputFormatIO.Read<Text, String> read =
HadoopInputFormatIO.<Text, String>read()
- .withConfiguration(serConf.getHadoopConfiguration())
+ .withConfiguration(serConf.get())
.withValueTranslation(myValueTranslateWithWrongInputType);
String expectedMessage =
String.format(
"Value translation's input type is not same as hadoop InputFormat : "
+ "%s value class : %s",
- serConf.getHadoopConfiguration().getClass("mapreduce.job.inputformat.class",
+ serConf.get().getClass("mapreduce.job.inputformat.class",
InputFormat.class),
- serConf.getHadoopConfiguration().getClass("value.class", Object.class));
+ serConf.get().getClass("value.class", Object.class));
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(expectedMessage);
read.validateTransform();
@@ -387,7 +387,7 @@ public class HadoopInputFormatIOTest {
@Test
public void testReadingData() throws Exception {
HadoopInputFormatIO.Read<Text, Employee> read = HadoopInputFormatIO.<Text, Employee>read()
- .withConfiguration(serConf.getHadoopConfiguration());
+ .withConfiguration(serConf.get());
List<KV<Text, Employee>> expected = TestEmployeeDataSet.getEmployeeData();
PCollection<KV<Text, Employee>> actual = p.apply("ReadTest", read);
PAssert.that(actual).containsInAnyOrder(expected);
@@ -413,11 +413,11 @@ public class HadoopInputFormatIOTest {
assertThat(
displayData,
hasDisplayItem("mapreduce.job.inputformat.class",
- serConf.getHadoopConfiguration().get("mapreduce.job.inputformat.class")));
+ serConf.get().get("mapreduce.job.inputformat.class")));
assertThat(displayData,
- hasDisplayItem("key.class", serConf.getHadoopConfiguration().get("key.class")));
+ hasDisplayItem("key.class", serConf.get().get("key.class")));
assertThat(displayData,
- hasDisplayItem("value.class", serConf.getHadoopConfiguration().get("value.class")));
+ hasDisplayItem("value.class", serConf.get().get("value.class")));
}
/**