You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/04/19 09:47:40 UTC

[2/3] incubator-beam git commit: [BEAM-196] abstraction for PipelineOptions serialization

[BEAM-196] abstraction for PipelineOptions serialization


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/81577b31
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/81577b31
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/81577b31

Branch: refs/heads/master
Commit: 81577b31c2642522f7dd4ba8eba794df48a0ca56
Parents: 56e28a9
Author: Maximilian Michels <mx...@apache.org>
Authored: Mon Apr 18 17:40:38 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Mon Apr 18 18:10:05 2016 +0200

----------------------------------------------------------------------
 .../utils/SerializedPipelineOptions.java        | 54 ++++++++++++++++
 .../beam/runners/flink/PipelineOptionsTest.java | 68 ++++++++++++++++++++
 2 files changed, 122 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/81577b31/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
new file mode 100644
index 0000000..7439e02
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.flink.translation.utils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Encapsulates the PipelineOptions in serialized form to ship them to the cluster.
+ */
+public class SerializedPipelineOptions implements Serializable {
+
+  private byte[] serializedOptions;
+
+  public SerializedPipelineOptions(PipelineOptions options) {
+
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+      new ObjectMapper().writeValue(baos, options);
+      this.serializedOptions = baos.toByteArray();
+    } catch (Exception e) {
+      throw new RuntimeException("Couldn't serialize PipelineOptions.", e);
+    }
+
+  }
+
+  public PipelineOptions deserializeOptions() {
+    try {
+      return new ObjectMapper().readValue(serializedOptions, PipelineOptions.class);
+    } catch (IOException e) {
+      throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/81577b31/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
new file mode 100644
index 0000000..464c6df
--- /dev/null
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests the serialization and deserialization of PipelineOptions.
+ */
+public class PipelineOptionsTest {
+
+  private interface MyOptions extends FlinkPipelineOptions {
+    @Description("Bla bla bla")
+    @Default.String("Hello")
+    String getTestOption();
+    void setTestOption(String value);
+  }
+
+  private static MyOptions options;
+  private static SerializedPipelineOptions serializedOptions;
+
+  private final static String[] args = new String[]{"--testOption=nothing"};
+
+  @BeforeClass
+  public static void beforeTest() {
+    options = PipelineOptionsFactory.fromArgs(args).as(MyOptions.class);
+    serializedOptions = new SerializedPipelineOptions(options);
+  }
+
+  @Test
+  public void testDeserialization() {
+    MyOptions deserializedOptions = serializedOptions.getPipelineOptions().as(MyOptions.class);
+    assertEquals("nothing", deserializedOptions.getTestOption());
+  }
+
+  @Test
+  public void testCaching() {
+    MyOptions deserializedOptions = serializedOptions.getPipelineOptions().as(MyOptions.class);
+    assertNotNull(deserializedOptions);
+    assertEquals(deserializedOptions, serializedOptions.getPipelineOptions());
+    assertEquals(deserializedOptions, serializedOptions.getPipelineOptions());
+    assertEquals(deserializedOptions, serializedOptions.getPipelineOptions());
+  }
+
+}