You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/04/19 09:47:40 UTC
[2/3] incubator-beam git commit: [BEAM-196] abstraction for
PipelineOptions serialization
[BEAM-196] abstraction for PipelineOptions serialization
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/81577b31
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/81577b31
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/81577b31
Branch: refs/heads/master
Commit: 81577b31c2642522f7dd4ba8eba794df48a0ca56
Parents: 56e28a9
Author: Maximilian Michels <mx...@apache.org>
Authored: Mon Apr 18 17:40:38 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Mon Apr 18 18:10:05 2016 +0200
----------------------------------------------------------------------
.../utils/SerializedPipelineOptions.java | 54 ++++++++++++++++
.../beam/runners/flink/PipelineOptionsTest.java | 68 ++++++++++++++++++++
2 files changed, 122 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/81577b31/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
new file mode 100644
index 0000000..7439e02
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.flink.translation.utils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Encapsulates the PipelineOptions in serialized form to ship them to the cluster.
+ */
+public class SerializedPipelineOptions implements Serializable {
+
+ private byte[] serializedOptions;
+
+ public SerializedPipelineOptions(PipelineOptions options) {
+
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ new ObjectMapper().writeValue(baos, options);
+ this.serializedOptions = baos.toByteArray();
+ } catch (Exception e) {
+ throw new RuntimeException("Couldn't serialize PipelineOptions.", e);
+ }
+
+ }
+
+ public PipelineOptions deserializeOptions() {
+ try {
+ return new ObjectMapper().readValue(serializedOptions, PipelineOptions.class);
+ } catch (IOException e) {
+ throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/81577b31/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
new file mode 100644
index 0000000..464c6df
--- /dev/null
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests the serialization and deserialization of PipelineOptions.
+ */
+public class PipelineOptionsTest {
+
+ private interface MyOptions extends FlinkPipelineOptions {
+ @Description("Bla bla bla")
+ @Default.String("Hello")
+ String getTestOption();
+ void setTestOption(String value);
+ }
+
+ private static MyOptions options;
+ private static SerializedPipelineOptions serializedOptions;
+
+ private final static String[] args = new String[]{"--testOption=nothing"};
+
+ @BeforeClass
+ public static void beforeTest() {
+ options = PipelineOptionsFactory.fromArgs(args).as(MyOptions.class);
+ serializedOptions = new SerializedPipelineOptions(options);
+ }
+
+ @Test
+ public void testDeserialization() {
+ MyOptions deserializedOptions = serializedOptions.getPipelineOptions().as(MyOptions.class);
+ assertEquals("nothing", deserializedOptions.getTestOption());
+ }
+
+ @Test
+ public void testCaching() {
+ MyOptions deserializedOptions = serializedOptions.getPipelineOptions().as(MyOptions.class);
+ assertNotNull(deserializedOptions);
+ assertEquals(deserializedOptions, serializedOptions.getPipelineOptions());
+ assertEquals(deserializedOptions, serializedOptions.getPipelineOptions());
+ assertEquals(deserializedOptions, serializedOptions.getPipelineOptions());
+ }
+
+}