You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/02 18:17:10 UTC
[2/4] beam git commit: Introduces SerializablePipelineOptions in
core-construction
Introduces SerializablePipelineOptions in core-construction
Removes analogous classes from spark/flink and their tests.
The analogous class in Spark was SparkRuntimeContext, which
also contained a CoderRegistry, but the CoderRegistry was used
only in a class that was itself unused. I removed that class.
This also allows removing a bunch of Jackson dependencies
from Spark runner.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7db051ae
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7db051ae
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7db051ae
Branch: refs/heads/master
Commit: 7db051aeae2b8e6b2dbfcc1da31410ec118299f6
Parents: ff4b36c
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Jul 28 12:48:41 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Wed Aug 2 11:04:50 2017 -0700
----------------------------------------------------------------------
runners/apex/pom.xml | 8 -
.../operators/ApexGroupByKeyOperator.java | 6 +-
.../operators/ApexParDoOperator.java | 6 +-
.../ApexReadUnboundedInputOperator.java | 6 +-
.../utils/SerializablePipelineOptions.java | 78 ---------
.../translation/utils/PipelineOptionsTest.java | 150 -----------------
runners/core-construction-java/pom.xml | 15 ++
.../SerializablePipelineOptions.java | 74 +++++++++
.../SerializablePipelineOptionsTest.java | 89 ++++++++++
runners/flink/pom.xml | 10 --
.../functions/FlinkDoFnFunction.java | 10 +-
.../FlinkMergingNonShuffleReduceFunction.java | 8 +-
.../functions/FlinkPartialReduceFunction.java | 8 +-
.../functions/FlinkReduceFunction.java | 8 +-
.../functions/FlinkStatefulDoFnFunction.java | 10 +-
.../utils/SerializedPipelineOptions.java | 77 ---------
.../translation/wrappers/SourceInputFormat.java | 10 +-
.../wrappers/streaming/DoFnOperator.java | 10 +-
.../streaming/SplittableDoFnOperator.java | 2 +-
.../streaming/io/BoundedSourceWrapper.java | 10 +-
.../streaming/io/UnboundedSourceWrapper.java | 12 +-
.../beam/runners/flink/PipelineOptionsTest.java | 165 +------------------
runners/spark/pom.xml | 12 --
.../spark/aggregators/NamedAggregators.java | 93 -----------
.../beam/runners/spark/io/SourceDStream.java | 20 +--
.../apache/beam/runners/spark/io/SourceRDD.java | 22 +--
.../runners/spark/io/SparkUnboundedSource.java | 6 +-
.../SparkGroupAlsoByWindowViaWindowSet.java | 10 +-
.../spark/stateful/StateSpecFunctions.java | 8 +-
.../spark/translation/EvaluationContext.java | 11 +-
.../spark/translation/MultiDoFnFunction.java | 16 +-
.../translation/SparkAbstractCombineFn.java | 9 +-
.../spark/translation/SparkGlobalCombineFn.java | 5 +-
...SparkGroupAlsoByWindowViaOutputBufferFn.java | 9 +-
.../spark/translation/SparkKeyedCombineFn.java | 5 +-
.../spark/translation/SparkRuntimeContext.java | 90 ----------
.../spark/translation/TransformTranslator.java | 27 ++-
.../streaming/StreamingTransformTranslator.java | 20 +--
.../translation/SparkRuntimeContextTest.java | 122 --------------
.../beam/sdk/options/PipelineOptions.java | 7 +-
.../apache/beam/sdk/options/ValueProviders.java | 8 +-
41 files changed, 327 insertions(+), 945 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index fd5aafb..96aac8b 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -63,14 +63,6 @@
<version>${apex.malhar.version}</version>
</dependency>
<dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </dependency>
- <dependency>
<groupId>org.apache.apex</groupId>
<artifactId>apex-engine</artifactId>
<version>${apex.core.version}</version>
http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 39f681f..5c0d72f 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -33,7 +33,6 @@ import java.util.Collections;
import org.apache.beam.runners.apex.ApexPipelineOptions;
import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
-import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
import org.apache.beam.runners.core.NullSideInputReader;
import org.apache.beam.runners.core.OutputWindowedValue;
import org.apache.beam.runners.core.ReduceFnRunner;
@@ -41,6 +40,7 @@ import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.construction.TriggerTranslation;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
@@ -149,7 +149,9 @@ public class ApexGroupByKeyOperator<K, V> implements Operator,
@Override
public void setup(OperatorContext context) {
- this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(serializedOptions.get(), this);
+ this.traceTuples =
+ ApexStreamTuple.Logging.isDebugEnabled(
+ serializedOptions.get().as(ApexPipelineOptions.class), this);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index c3cbab2..4dc807d 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -40,7 +40,6 @@ import org.apache.beam.runners.apex.ApexRunner;
import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
import org.apache.beam.runners.apex.translation.utils.NoOpStepContext;
-import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
import org.apache.beam.runners.apex.translation.utils.StateInternalsProxy;
import org.apache.beam.runners.apex.translation.utils.ValueAndCoderKryoSerializable;
import org.apache.beam.runners.core.DoFnRunner;
@@ -64,6 +63,7 @@ import org.apache.beam.runners.core.StatefulDoFnRunner;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.core.TimerInternalsFactory;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ListCoder;
@@ -386,7 +386,9 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
@Override
public void setup(OperatorContext context) {
- this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(pipelineOptions.get(), this);
+ this.traceTuples =
+ ApexStreamTuple.Logging.isDebugEnabled(
+ pipelineOptions.get().as(ApexPipelineOptions.class), this);
SideInputReader sideInputReader = NullSideInputReader.of(sideInputs);
if (!sideInputs.isEmpty()) {
sideInputHandler = new SideInputHandler(sideInputs, sideInputStateInternals);
http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java
index 1549560..21fb9d2 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java
@@ -30,8 +30,8 @@ import java.io.IOException;
import org.apache.beam.runners.apex.ApexPipelineOptions;
import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple.DataTuple;
-import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
import org.apache.beam.runners.apex.translation.utils.ValuesSource;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -119,7 +119,9 @@ public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT
@Override
public void setup(OperatorContext context) {
- this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(pipelineOptions.get(), this);
+ this.traceTuples =
+ ApexStreamTuple.Logging.isDebugEnabled(
+ pipelineOptions.get().as(ApexPipelineOptions.class), this);
try {
reader = source.createReader(this.pipelineOptions.get(), null);
available = reader.start();
http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
deleted file mode 100644
index 46b04fc..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translation.utils;
-
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.sdk.io.FileSystems;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.common.ReflectHelpers;
-
-/**
- * A wrapper to enable serialization of {@link PipelineOptions}.
- */
-public class SerializablePipelineOptions implements Externalizable {
-
- /* Used to ensure we initialize file systems exactly once, because it's a slow operation. */
- private static final AtomicBoolean FILE_SYSTEMS_INTIIALIZED = new AtomicBoolean(false);
-
- private transient ApexPipelineOptions pipelineOptions;
-
- public SerializablePipelineOptions(ApexPipelineOptions pipelineOptions) {
- this.pipelineOptions = pipelineOptions;
- }
-
- public SerializablePipelineOptions() {
- }
-
- public ApexPipelineOptions get() {
- return this.pipelineOptions;
- }
-
- @Override
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeUTF(createMapper().writeValueAsString(pipelineOptions));
- }
-
- @Override
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- String s = in.readUTF();
- this.pipelineOptions = createMapper().readValue(s, PipelineOptions.class)
- .as(ApexPipelineOptions.class);
-
- if (FILE_SYSTEMS_INTIIALIZED.compareAndSet(false, true)) {
- FileSystems.setDefaultPipelineOptions(pipelineOptions);
- }
- }
-
- /**
- * Use an {@link ObjectMapper} configured with any {@link Module}s in the class path allowing
- * for user specified configuration injection into the ObjectMapper. This supports user custom
- * types on {@link PipelineOptions}.
- */
- private static ObjectMapper createMapper() {
- return new ObjectMapper().registerModules(
- ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java
deleted file mode 100644
index 118ff99..0000000
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translation.utils;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import com.datatorrent.common.util.FSStorageAgent;
-import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.JsonDeserializer;
-import com.fasterxml.jackson.databind.JsonSerializer;
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.SerializerProvider;
-import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.auto.service.AutoService;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.junit.Test;
-
-/**
- * Tests the serialization of PipelineOptions.
- */
-public class PipelineOptionsTest {
-
- /**
- * Interface for testing.
- */
- public interface MyOptions extends ApexPipelineOptions {
- @Description("Bla bla bla")
- @Default.String("Hello")
- String getTestOption();
- void setTestOption(String value);
- }
-
- private static class OptionsWrapper {
- private OptionsWrapper() {
- this(null); // required for Kryo
- }
- private OptionsWrapper(ApexPipelineOptions options) {
- this.options = new SerializablePipelineOptions(options);
- }
- @Bind(JavaSerializer.class)
- private final SerializablePipelineOptions options;
- }
-
- @Test
- public void testSerialization() {
- OptionsWrapper wrapper = new OptionsWrapper(
- PipelineOptionsFactory.fromArgs("--testOption=nothing").as(MyOptions.class));
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- FSStorageAgent.store(bos, wrapper);
-
- ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
- OptionsWrapper wrapperCopy = (OptionsWrapper) FSStorageAgent.retrieve(bis);
- assertNotNull(wrapperCopy.options);
- assertEquals("nothing", wrapperCopy.options.get().as(MyOptions.class).getTestOption());
- }
-
- @Test
- public void testSerializationWithUserCustomType() {
- OptionsWrapper wrapper = new OptionsWrapper(
- PipelineOptionsFactory.fromArgs("--jacksonIncompatible=\"testValue\"")
- .as(JacksonIncompatibleOptions.class));
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- FSStorageAgent.store(bos, wrapper);
-
- ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
- OptionsWrapper wrapperCopy = (OptionsWrapper) FSStorageAgent.retrieve(bis);
- assertNotNull(wrapperCopy.options);
- assertEquals("testValue",
- wrapperCopy.options.get().as(JacksonIncompatibleOptions.class)
- .getJacksonIncompatible().value);
- }
-
- /** PipelineOptions used to test auto registration of Jackson modules. */
- public interface JacksonIncompatibleOptions extends ApexPipelineOptions {
- JacksonIncompatible getJacksonIncompatible();
- void setJacksonIncompatible(JacksonIncompatible value);
- }
-
- /** A Jackson {@link Module} to test auto-registration of modules. */
- @AutoService(Module.class)
- public static class RegisteredTestModule extends SimpleModule {
- public RegisteredTestModule() {
- super("RegisteredTestModule");
- setMixInAnnotation(JacksonIncompatible.class, JacksonIncompatibleMixin.class);
- }
- }
-
- /** A class which Jackson does not know how to serialize/deserialize. */
- public static class JacksonIncompatible {
- private final String value;
- public JacksonIncompatible(String value) {
- this.value = value;
- }
- }
-
- /** A Jackson mixin used to add annotations to other classes. */
- @JsonDeserialize(using = JacksonIncompatibleDeserializer.class)
- @JsonSerialize(using = JacksonIncompatibleSerializer.class)
- public static final class JacksonIncompatibleMixin {}
-
- /** A Jackson deserializer for {@link JacksonIncompatible}. */
- public static class JacksonIncompatibleDeserializer extends
- JsonDeserializer<JacksonIncompatible> {
-
- @Override
- public JacksonIncompatible deserialize(JsonParser jsonParser,
- DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
- return new JacksonIncompatible(jsonParser.readValueAs(String.class));
- }
- }
-
- /** A Jackson serializer for {@link JacksonIncompatible}. */
- public static class JacksonIncompatibleSerializer extends JsonSerializer<JacksonIncompatible> {
-
- @Override
- public void serialize(JacksonIncompatible jacksonIncompatible, JsonGenerator jsonGenerator,
- SerializerProvider serializerProvider) throws IOException, JsonProcessingException {
- jsonGenerator.writeString(jacksonIncompatible.value);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/core-construction-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/pom.xml b/runners/core-construction-java/pom.xml
index b85b5f5..1a52914 100644
--- a/runners/core-construction-java/pom.xml
+++ b/runners/core-construction-java/pom.xml
@@ -65,6 +65,21 @@
</dependency>
<dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SerializablePipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SerializablePipelineOptions.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SerializablePipelineOptions.java
new file mode 100644
index 0000000..e697fb2
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SerializablePipelineOptions.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
+
+/**
+ * Holds a {@link PipelineOptions} in JSON serialized form and calls {@link
+ * FileSystems#setDefaultPipelineOptions(PipelineOptions)} on construction or on deserialization.
+ */
+public class SerializablePipelineOptions implements Serializable {
+ private static final ObjectMapper MAPPER =
+ new ObjectMapper()
+ .registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
+
+ private final String serializedPipelineOptions;
+ private transient PipelineOptions options;
+
+ public SerializablePipelineOptions(PipelineOptions options) {
+ this.serializedPipelineOptions = serializeToJson(options);
+ this.options = options;
+ FileSystems.setDefaultPipelineOptions(options);
+ }
+
+ public PipelineOptions get() {
+ return options;
+ }
+
+ private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException {
+ is.defaultReadObject();
+ this.options = deserializeFromJson(serializedPipelineOptions);
+ // TODO https://issues.apache.org/jira/browse/BEAM-2712: remove this call.
+ FileSystems.setDefaultPipelineOptions(options);
+ }
+
+ private static String serializeToJson(PipelineOptions options) {
+ try {
+ return MAPPER.writeValueAsString(options);
+ } catch (JsonProcessingException e) {
+ throw new IllegalArgumentException("Failed to serialize PipelineOptions", e);
+ }
+ }
+
+ private static PipelineOptions deserializeFromJson(String options) {
+ try {
+ return MAPPER.readValue(options, PipelineOptions.class);
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Failed to deserialize PipelineOptions", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SerializablePipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SerializablePipelineOptionsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SerializablePipelineOptionsTest.java
new file mode 100644
index 0000000..cd470b2
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SerializablePipelineOptionsTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static org.junit.Assert.assertEquals;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link SerializablePipelineOptions}. */
+@RunWith(JUnit4.class)
+public class SerializablePipelineOptionsTest {
+ /** Options for testing. */
+ public interface MyOptions extends PipelineOptions {
+ String getFoo();
+
+ void setFoo(String foo);
+
+ @JsonIgnore
+ @Default.String("not overridden")
+ String getIgnoredField();
+
+ void setIgnoredField(String value);
+ }
+
+ @Test
+ public void testSerializationAndDeserialization() throws Exception {
+ PipelineOptions options =
+ PipelineOptionsFactory.fromArgs("--foo=testValue", "--ignoredField=overridden")
+ .as(MyOptions.class);
+
+ SerializablePipelineOptions serializableOptions = new SerializablePipelineOptions(options);
+ assertEquals("testValue", serializableOptions.get().as(MyOptions.class).getFoo());
+ assertEquals("overridden", serializableOptions.get().as(MyOptions.class).getIgnoredField());
+
+ SerializablePipelineOptions copy = SerializableUtils.clone(serializableOptions);
+ assertEquals("testValue", copy.get().as(MyOptions.class).getFoo());
+ assertEquals("not overridden", copy.get().as(MyOptions.class).getIgnoredField());
+ }
+
+ @Test
+ public void testIndependence() throws Exception {
+ SerializablePipelineOptions first =
+ new SerializablePipelineOptions(
+ PipelineOptionsFactory.fromArgs("--foo=first").as(MyOptions.class));
+ SerializablePipelineOptions firstCopy = SerializableUtils.clone(first);
+ SerializablePipelineOptions second =
+ new SerializablePipelineOptions(
+ PipelineOptionsFactory.fromArgs("--foo=second").as(MyOptions.class));
+ SerializablePipelineOptions secondCopy = SerializableUtils.clone(second);
+
+ assertEquals("first", first.get().as(MyOptions.class).getFoo());
+ assertEquals("first", firstCopy.get().as(MyOptions.class).getFoo());
+ assertEquals("second", second.get().as(MyOptions.class).getFoo());
+ assertEquals("second", secondCopy.get().as(MyOptions.class).getFoo());
+
+ first.get().as(MyOptions.class).setFoo("new first");
+ firstCopy.get().as(MyOptions.class).setFoo("new firstCopy");
+ second.get().as(MyOptions.class).setFoo("new second");
+ secondCopy.get().as(MyOptions.class).setFoo("new secondCopy");
+
+ assertEquals("new first", first.get().as(MyOptions.class).getFoo());
+ assertEquals("new firstCopy", firstCopy.get().as(MyOptions.class).getFoo());
+ assertEquals("new second", second.get().as(MyOptions.class).getFoo());
+ assertEquals("new secondCopy", secondCopy.get().as(MyOptions.class).getFoo());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index c063a2d..06746fd 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -256,16 +256,6 @@
</dependency>
<dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </dependency>
-
- <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
index d8ed622..3048168 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -22,9 +22,9 @@ import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
@@ -50,7 +50,7 @@ import org.apache.flink.util.Collector;
public class FlinkDoFnFunction<InputT, OutputT>
extends RichMapPartitionFunction<WindowedValue<InputT>, WindowedValue<OutputT>> {
- private final SerializedPipelineOptions serializedOptions;
+ private final SerializablePipelineOptions serializedOptions;
private final DoFn<InputT, OutputT> doFn;
private final String stepName;
@@ -75,7 +75,7 @@ public class FlinkDoFnFunction<InputT, OutputT>
this.doFn = doFn;
this.stepName = stepName;
this.sideInputs = sideInputs;
- this.serializedOptions = new SerializedPipelineOptions(options);
+ this.serializedOptions = new SerializablePipelineOptions(options);
this.windowingStrategy = windowingStrategy;
this.outputMap = outputMap;
this.mainOutputTag = mainOutputTag;
@@ -101,7 +101,7 @@ public class FlinkDoFnFunction<InputT, OutputT>
List<TupleTag<?>> additionalOutputTags = Lists.newArrayList(outputMap.keySet());
DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.simpleRunner(
- serializedOptions.getPipelineOptions(), doFn,
+ serializedOptions.get(), doFn,
new FlinkSideInputReader(sideInputs, runtimeContext),
outputManager,
mainOutputTag,
@@ -109,7 +109,7 @@ public class FlinkDoFnFunction<InputT, OutputT>
new FlinkNoOpStepContext(),
windowingStrategy);
- if ((serializedOptions.getPipelineOptions().as(FlinkPipelineOptions.class))
+ if ((serializedOptions.get().as(FlinkPipelineOptions.class))
.getEnableMetrics()) {
doFnRunner = new DoFnRunnerWithMetricsUpdate<>(stepName, doFnRunner, getRuntimeContext());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
index 13be913..c73dade 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
@@ -18,7 +18,7 @@
package org.apache.beam.runners.flink.translation.functions;
import java.util.Map;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -47,7 +47,7 @@ public class FlinkMergingNonShuffleReduceFunction<
private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
- private final SerializedPipelineOptions serializedOptions;
+ private final SerializablePipelineOptions serializedOptions;
public FlinkMergingNonShuffleReduceFunction(
CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT> combineFn,
@@ -60,7 +60,7 @@ public class FlinkMergingNonShuffleReduceFunction<
this.windowingStrategy = windowingStrategy;
this.sideInputs = sideInputs;
- this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
+ this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
}
@@ -69,7 +69,7 @@ public class FlinkMergingNonShuffleReduceFunction<
Iterable<WindowedValue<KV<K, InputT>>> elements,
Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
- PipelineOptions options = serializedOptions.getPipelineOptions();
+ PipelineOptions options = serializedOptions.get();
FlinkSideInputReader sideInputReader =
new FlinkSideInputReader(sideInputs, getRuntimeContext());
http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
index db12a49..49e821c 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
@@ -18,7 +18,7 @@
package org.apache.beam.runners.flink.translation.functions;
import java.util.Map;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -46,7 +46,7 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
protected final WindowingStrategy<Object, W> windowingStrategy;
- protected final SerializedPipelineOptions serializedOptions;
+ protected final SerializablePipelineOptions serializedOptions;
protected final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
@@ -59,7 +59,7 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
this.combineFn = combineFn;
this.windowingStrategy = windowingStrategy;
this.sideInputs = sideInputs;
- this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
+ this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
}
@@ -68,7 +68,7 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
Iterable<WindowedValue<KV<K, InputT>>> elements,
Collector<WindowedValue<KV<K, AccumT>>> out) throws Exception {
- PipelineOptions options = serializedOptions.getPipelineOptions();
+ PipelineOptions options = serializedOptions.get();
FlinkSideInputReader sideInputReader =
new FlinkSideInputReader(sideInputs, getRuntimeContext());
http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
index 53d71d8..6645b3a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
@@ -18,7 +18,7 @@
package org.apache.beam.runners.flink.translation.functions;
import java.util.Map;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -48,7 +48,7 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
protected final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
- protected final SerializedPipelineOptions serializedOptions;
+ protected final SerializablePipelineOptions serializedOptions;
public FlinkReduceFunction(
CombineFnBase.GlobalCombineFn<?, AccumT, OutputT> combineFn,
@@ -61,7 +61,7 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
this.windowingStrategy = windowingStrategy;
this.sideInputs = sideInputs;
- this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
+ this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
}
@@ -70,7 +70,7 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
Iterable<WindowedValue<KV<K, AccumT>>> elements,
Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
- PipelineOptions options = serializedOptions.getPipelineOptions();
+ PipelineOptions options = serializedOptions.get();
FlinkSideInputReader sideInputReader =
new FlinkSideInputReader(sideInputs, getRuntimeContext());
http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
index 11d4fee..412269c 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
@@ -31,9 +31,9 @@ import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
@@ -61,7 +61,7 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
private String stepName;
private final WindowingStrategy<?, ?> windowingStrategy;
private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
- private final SerializedPipelineOptions serializedOptions;
+ private final SerializablePipelineOptions serializedOptions;
private final Map<TupleTag<?>, Integer> outputMap;
private final TupleTag<OutputT> mainOutputTag;
private transient DoFnInvoker doFnInvoker;
@@ -79,7 +79,7 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
this.stepName = stepName;
this.windowingStrategy = windowingStrategy;
this.sideInputs = sideInputs;
- this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
+ this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
this.outputMap = outputMap;
this.mainOutputTag = mainOutputTag;
}
@@ -118,7 +118,7 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
List<TupleTag<?>> additionalOutputTags = Lists.newArrayList(outputMap.keySet());
DoFnRunner<KV<K, V>, OutputT> doFnRunner = DoFnRunners.simpleRunner(
- serializedOptions.getPipelineOptions(), dofn,
+ serializedOptions.get(), dofn,
new FlinkSideInputReader(sideInputs, runtimeContext),
outputManager,
mainOutputTag,
@@ -135,7 +135,7 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
},
windowingStrategy);
- if ((serializedOptions.getPipelineOptions().as(FlinkPipelineOptions.class))
+ if ((serializedOptions.get().as(FlinkPipelineOptions.class))
.getEnableMetrics()) {
doFnRunner = new DoFnRunnerWithMetricsUpdate<>(stepName, doFnRunner, getRuntimeContext());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
deleted file mode 100644
index 40b6dd6..0000000
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.flink.translation.utils;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import org.apache.beam.sdk.io.FileSystems;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.common.ReflectHelpers;
-
-/**
- * Encapsulates the PipelineOptions in serialized form to ship them to the cluster.
- */
-public class SerializedPipelineOptions implements Serializable {
-
- private final byte[] serializedOptions;
-
- /** Lazily initialized copy of deserialized options. */
- private transient PipelineOptions pipelineOptions;
-
- public SerializedPipelineOptions(PipelineOptions options) {
- checkNotNull(options, "PipelineOptions must not be null.");
-
- try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
- createMapper().writeValue(baos, options);
- this.serializedOptions = baos.toByteArray();
- } catch (Exception e) {
- throw new RuntimeException("Couldn't serialize PipelineOptions.", e);
- }
-
- }
-
- public PipelineOptions getPipelineOptions() {
- if (pipelineOptions == null) {
- try {
- pipelineOptions = createMapper().readValue(serializedOptions, PipelineOptions.class);
-
- FileSystems.setDefaultPipelineOptions(pipelineOptions);
- } catch (IOException e) {
- throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
- }
- }
-
- return pipelineOptions;
- }
-
- /**
- * Use an {@link ObjectMapper} configured with any {@link Module}s in the class path allowing
- * for user specified configuration injection into the ObjectMapper. This supports user custom
- * types on {@link PipelineOptions}.
- */
- private static ObjectMapper createMapper() {
- return new ObjectMapper().registerModules(
- ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
index 27e6912..3f9d601 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
@@ -19,9 +19,9 @@ package org.apache.beam.runners.flink.translation.wrappers;
import java.io.IOException;
import java.util.List;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -50,7 +50,7 @@ public class SourceInputFormat<T>
private final BoundedSource<T> initialSource;
private transient PipelineOptions options;
- private final SerializedPipelineOptions serializedOptions;
+ private final SerializablePipelineOptions serializedOptions;
private transient BoundedSource.BoundedReader<T> reader;
private boolean inputAvailable = false;
@@ -61,12 +61,12 @@ public class SourceInputFormat<T>
String stepName, BoundedSource<T> initialSource, PipelineOptions options) {
this.stepName = stepName;
this.initialSource = initialSource;
- this.serializedOptions = new SerializedPipelineOptions(options);
+ this.serializedOptions = new SerializablePipelineOptions(options);
}
@Override
public void configure(Configuration configuration) {
- options = serializedOptions.getPipelineOptions();
+ options = serializedOptions.get();
}
@Override
@@ -76,7 +76,7 @@ public class SourceInputFormat<T>
readerInvoker =
new ReaderInvocationUtil<>(
stepName,
- serializedOptions.getPipelineOptions(),
+ serializedOptions.get(),
metricContainer);
reader = ((BoundedSource<T>) sourceInputSplit.getSource()).createReader(options);
http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 7995ea8..62de423 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -47,10 +47,10 @@ import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.StatefulDoFnRunner;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate;
import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals;
import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkKeyGroupStateInternals;
import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkSplitStateInternals;
@@ -106,7 +106,7 @@ public class DoFnOperator<InputT, OutputT>
protected DoFn<InputT, OutputT> doFn;
- protected final SerializedPipelineOptions serializedOptions;
+ protected final SerializablePipelineOptions serializedOptions;
protected final TupleTag<OutputT> mainOutputTag;
protected final List<TupleTag<?>> additionalOutputTags;
@@ -174,7 +174,7 @@ public class DoFnOperator<InputT, OutputT>
this.additionalOutputTags = additionalOutputTags;
this.sideInputTagMapping = sideInputTagMapping;
this.sideInputs = sideInputs;
- this.serializedOptions = new SerializedPipelineOptions(options);
+ this.serializedOptions = new SerializablePipelineOptions(options);
this.windowingStrategy = windowingStrategy;
this.outputManagerFactory = outputManagerFactory;
@@ -256,7 +256,7 @@ public class DoFnOperator<InputT, OutputT>
org.apache.beam.runners.core.StepContext stepContext = createStepContext();
doFnRunner = DoFnRunners.simpleRunner(
- serializedOptions.getPipelineOptions(),
+ serializedOptions.get(),
doFn,
sideInputReader,
outputManager,
@@ -301,7 +301,7 @@ public class DoFnOperator<InputT, OutputT>
stateCleaner);
}
- if ((serializedOptions.getPipelineOptions().as(FlinkPipelineOptions.class))
+ if ((serializedOptions.get().as(FlinkPipelineOptions.class))
.getEnableMetrics()) {
doFnRunner = new DoFnRunnerWithMetricsUpdate<>(stepName, doFnRunner, getRuntimeContext());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
index 2f095d4..be758a6 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
@@ -115,7 +115,7 @@ public class SplittableDoFnOperator<
((ProcessFn) doFn).setProcessElementInvoker(
new OutputAndTimeBoundedSplittableProcessElementInvoker<>(
doFn,
- serializedOptions.getPipelineOptions(),
+ serializedOptions.get(),
new OutputWindowedValue<OutputT>() {
@Override
public void outputWindowedValue(
http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
index 6d75688..5ddc46f 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
@@ -20,9 +20,9 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.List;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -48,7 +48,7 @@ public class BoundedSourceWrapper<OutputT>
/**
* Keep the options so that we can initialize the readers.
*/
- private final SerializedPipelineOptions serializedOptions;
+ private final SerializablePipelineOptions serializedOptions;
/**
* The split sources. We split them in the constructor to ensure that all parallel
@@ -74,7 +74,7 @@ public class BoundedSourceWrapper<OutputT>
BoundedSource<OutputT> source,
int parallelism) throws Exception {
this.stepName = stepName;
- this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
+ this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
long desiredBundleSize = source.getEstimatedSizeBytes(pipelineOptions) / parallelism;
@@ -109,13 +109,13 @@ public class BoundedSourceWrapper<OutputT>
ReaderInvocationUtil<OutputT, BoundedSource.BoundedReader<OutputT>> readerInvoker =
new ReaderInvocationUtil<>(
stepName,
- serializedOptions.getPipelineOptions(),
+ serializedOptions.get(),
metricContainer);
readers = new ArrayList<>();
// initialize readers from scratch
for (BoundedSource<OutputT> source : localSources) {
- readers.add(source.createReader(serializedOptions.getPipelineOptions()));
+ readers.add(source.createReader(serializedOptions.get()));
}
if (readers.size() == 1) {
http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index e75072a..817dd74 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -22,10 +22,10 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
@@ -72,7 +72,7 @@ public class UnboundedSourceWrapper<
/**
* Keep the options so that we can initialize the localReaders.
*/
- private final SerializedPipelineOptions serializedOptions;
+ private final SerializablePipelineOptions serializedOptions;
/**
* For snapshot and restore.
@@ -141,7 +141,7 @@ public class UnboundedSourceWrapper<
UnboundedSource<OutputT, CheckpointMarkT> source,
int parallelism) throws Exception {
this.stepName = stepName;
- this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
+ this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
if (source.requiresDeduping()) {
LOG.warn("Source {} requires deduping but Flink runner doesn't support this yet.", source);
@@ -189,7 +189,7 @@ public class UnboundedSourceWrapper<
stateForCheckpoint.get()) {
localSplitSources.add(restored.getKey());
localReaders.add(restored.getKey().createReader(
- serializedOptions.getPipelineOptions(), restored.getValue()));
+ serializedOptions.get(), restored.getValue()));
}
} else {
// initialize localReaders and localSources from scratch
@@ -198,7 +198,7 @@ public class UnboundedSourceWrapper<
UnboundedSource<OutputT, CheckpointMarkT> source =
splitSources.get(i);
UnboundedSource.UnboundedReader<OutputT> reader =
- source.createReader(serializedOptions.getPipelineOptions(), null);
+ source.createReader(serializedOptions.get(), null);
localSplitSources.add(source);
localReaders.add(reader);
}
@@ -221,7 +221,7 @@ public class UnboundedSourceWrapper<
ReaderInvocationUtil<OutputT, UnboundedSource.UnboundedReader<OutputT>> readerInvoker =
new ReaderInvocationUtil<>(
stepName,
- serializedOptions.getPipelineOptions(),
+ serializedOptions.get(),
metricContainer);
if (localReaders.size() == 0) {
http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index d0281ec..eb06026 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -17,32 +17,8 @@
*/
package org.apache.beam.runners.flink;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.JsonDeserializer;
-import com.fasterxml.jackson.databind.JsonSerializer;
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.SerializerProvider;
-import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.auto.service.AutoService;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.util.Collections;
import java.util.HashMap;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.Default;
@@ -60,12 +36,10 @@ import org.apache.commons.lang3.SerializationUtils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.joda.time.Instant;
import org.junit.Assert;
-import org.junit.BeforeClass;
import org.junit.Test;
/**
@@ -73,9 +47,7 @@ import org.junit.Test;
*/
public class PipelineOptionsTest {
- /**
- * Pipeline options.
- */
+ /** Pipeline options. */
public interface MyOptions extends FlinkPipelineOptions {
@Description("Bla bla bla")
@Default.String("Hello")
@@ -83,60 +55,12 @@ public class PipelineOptionsTest {
void setTestOption(String value);
}
- private static MyOptions options;
- private static SerializedPipelineOptions serializedOptions;
-
- private static final String[] args = new String[]{"--testOption=nothing"};
-
- @BeforeClass
- public static void beforeTest() {
- options = PipelineOptionsFactory.fromArgs(args).as(MyOptions.class);
- serializedOptions = new SerializedPipelineOptions(options);
- }
-
- @Test
- public void testDeserialization() {
- MyOptions deserializedOptions = serializedOptions.getPipelineOptions().as(MyOptions.class);
- assertEquals("nothing", deserializedOptions.getTestOption());
- }
-
- @Test
- public void testIgnoredFieldSerialization() {
- FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
- options.setStateBackend(new MemoryStateBackend());
-
- FlinkPipelineOptions deserialized =
- new SerializedPipelineOptions(options).getPipelineOptions().as(FlinkPipelineOptions.class);
-
- assertNull(deserialized.getStateBackend());
- }
-
- @Test
- public void testEnableMetrics() {
- FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
- options.setEnableMetrics(false);
- assertFalse(options.getEnableMetrics());
- }
-
- @Test
- public void testCaching() {
- PipelineOptions deserializedOptions =
- serializedOptions.getPipelineOptions().as(PipelineOptions.class);
-
- assertNotNull(deserializedOptions);
- assertTrue(deserializedOptions == serializedOptions.getPipelineOptions());
- assertTrue(deserializedOptions == serializedOptions.getPipelineOptions());
- assertTrue(deserializedOptions == serializedOptions.getPipelineOptions());
- }
-
- @Test(expected = Exception.class)
- public void testNonNull() {
- new SerializedPipelineOptions(null);
- }
+ private static MyOptions options =
+ PipelineOptionsFactory.fromArgs("--testOption=nothing").as(MyOptions.class);
@Test(expected = Exception.class)
public void parDoBaseClassPipelineOptionsNullTest() {
- DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
+ new DoFnOperator<>(
new TestDoFn(),
"stepName",
WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()),
@@ -196,18 +120,7 @@ public class PipelineOptionsTest {
}
- @Test
- public void testExternalizedCheckpointsConfigs() {
- String[] args = new String[] { "--externalizedCheckpointsEnabled=true",
- "--retainExternalizedCheckpointsOnCancellation=false" };
- final FlinkPipelineOptions options = PipelineOptionsFactory.fromArgs(args)
- .as(FlinkPipelineOptions.class);
- assertEquals(options.isExternalizedCheckpointsEnabled(), true);
- assertEquals(options.getRetainExternalizedCheckpointsOnCancellation(), false);
- }
-
private static class TestDoFn extends DoFn<String, String> {
-
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Assert.assertNotNull(c.getPipelineOptions());
@@ -216,74 +129,4 @@ public class PipelineOptionsTest {
c.getPipelineOptions().as(MyOptions.class).getTestOption());
}
}
-
- /** PipelineOptions used to test auto registration of Jackson modules. */
- public interface JacksonIncompatibleOptions extends PipelineOptions {
- JacksonIncompatible getJacksonIncompatible();
- void setJacksonIncompatible(JacksonIncompatible value);
- }
-
- /** A Jackson {@link Module} to test auto-registration of modules. */
- @AutoService(Module.class)
- public static class RegisteredTestModule extends SimpleModule {
- public RegisteredTestModule() {
- super("RegisteredTestModule");
- setMixInAnnotation(JacksonIncompatible.class, JacksonIncompatibleMixin.class);
- }
- }
-
- /** A class which Jackson does not know how to serialize/deserialize. */
- public static class JacksonIncompatible {
- private final String value;
- public JacksonIncompatible(String value) {
- this.value = value;
- }
- }
-
- /** A Jackson mixin used to add annotations to other classes. */
- @JsonDeserialize(using = JacksonIncompatibleDeserializer.class)
- @JsonSerialize(using = JacksonIncompatibleSerializer.class)
- public static final class JacksonIncompatibleMixin {}
-
- /** A Jackson deserializer for {@link JacksonIncompatible}. */
- public static class JacksonIncompatibleDeserializer extends
- JsonDeserializer<JacksonIncompatible> {
-
- @Override
- public JacksonIncompatible deserialize(JsonParser jsonParser,
- DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
- return new JacksonIncompatible(jsonParser.readValueAs(String.class));
- }
- }
-
- /** A Jackson serializer for {@link JacksonIncompatible}. */
- public static class JacksonIncompatibleSerializer extends JsonSerializer<JacksonIncompatible> {
-
- @Override
- public void serialize(JacksonIncompatible jacksonIncompatible, JsonGenerator jsonGenerator,
- SerializerProvider serializerProvider) throws IOException, JsonProcessingException {
- jsonGenerator.writeString(jacksonIncompatible.value);
- }
- }
-
- @Test
- public void testSerializingPipelineOptionsWithCustomUserType() throws Exception {
- String expectedValue = "testValue";
- PipelineOptions options = PipelineOptionsFactory
- .fromArgs("--jacksonIncompatible=\"" + expectedValue + "\"")
- .as(JacksonIncompatibleOptions.class);
- SerializedPipelineOptions context = new SerializedPipelineOptions(options);
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- try (ObjectOutputStream outputStream = new ObjectOutputStream(baos)) {
- outputStream.writeObject(context);
- }
- try (ObjectInputStream inputStream =
- new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()))) {
- SerializedPipelineOptions copy = (SerializedPipelineOptions) inputStream.readObject();
- assertEquals(expectedValue,
- copy.getPipelineOptions().as(JacksonIncompatibleOptions.class)
- .getJacksonIncompatible().value);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index e823060..b2e7fe4 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -35,7 +35,6 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<kafka.version>0.9.0.1</kafka.version>
- <jackson.version>2.4.4</jackson.version>
<dropwizard.metrics.version>3.1.2</dropwizard.metrics.version>
</properties>
@@ -184,18 +183,7 @@
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- <version>${jackson.version}</version>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
- <version>${jackson.version}</version>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- <version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
index 27f2ec8..a9f2c44 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
@@ -19,18 +19,11 @@
package org.apache.beam.runners.spark.aggregators;
import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.Map;
import java.util.TreeMap;
-import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.Combine;
/**
@@ -52,17 +45,6 @@ public class NamedAggregators implements Serializable {
}
/**
- * Constructs a new named aggregators instance that contains a mapping from the specified
- * `named` to the associated initial state.
- *
- * @param name Name of aggregator.
- * @param state Associated State.
- */
- public NamedAggregators(String name, State<?, ?, ?> state) {
- this.mNamedAggregators.put(name, state);
- }
-
- /**
* @param name Name of aggregator to retrieve.
* @param typeClass Type class to cast the value to.
* @param <T> Type to be returned.
@@ -152,79 +134,4 @@ public class NamedAggregators implements Serializable {
Combine.CombineFn<InputT, InterT, OutputT> getCombineFn();
}
- /**
- * @param <InputT> Input data type
- * @param <InterT> Intermediate data type (useful for averages)
- * @param <OutputT> Output data type
- */
- public static class CombineFunctionState<InputT, InterT, OutputT>
- implements State<InputT, InterT, OutputT> {
-
- private Combine.CombineFn<InputT, InterT, OutputT> combineFn;
- private Coder<InputT> inCoder;
- private SparkRuntimeContext ctxt;
- private transient InterT state;
-
- public CombineFunctionState(
- Combine.CombineFn<InputT, InterT, OutputT> combineFn,
- Coder<InputT> inCoder,
- SparkRuntimeContext ctxt) {
- this.combineFn = combineFn;
- this.inCoder = inCoder;
- this.ctxt = ctxt;
- this.state = combineFn.createAccumulator();
- }
-
- @Override
- public void update(InputT element) {
- combineFn.addInput(state, element);
- }
-
- @Override
- public State<InputT, InterT, OutputT> merge(State<InputT, InterT, OutputT> other) {
- this.state = combineFn.mergeAccumulators(ImmutableList.of(current(), other.current()));
- return this;
- }
-
- @Override
- public InterT current() {
- return state;
- }
-
- @Override
- public OutputT render() {
- return combineFn.extractOutput(state);
- }
-
- @Override
- public Combine.CombineFn<InputT, InterT, OutputT> getCombineFn() {
- return combineFn;
- }
-
- private void writeObject(ObjectOutputStream oos) throws IOException {
- oos.writeObject(ctxt);
- oos.writeObject(combineFn);
- oos.writeObject(inCoder);
- try {
- combineFn.getAccumulatorCoder(ctxt.getCoderRegistry(), inCoder)
- .encode(state, oos);
- } catch (CannotProvideCoderException e) {
- throw new IllegalStateException("Could not determine coder for accumulator", e);
- }
- }
-
- @SuppressWarnings("unchecked")
- private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
- ctxt = (SparkRuntimeContext) ois.readObject();
- combineFn = (Combine.CombineFn<InputT, InterT, OutputT>) ois.readObject();
- inCoder = (Coder<InputT>) ois.readObject();
- try {
- state = combineFn.getAccumulatorCoder(ctxt.getCoderRegistry(), inCoder)
- .decode(ois);
- } catch (CannotProvideCoderException e) {
- throw new IllegalStateException("Could not determine coder for accumulator", e);
- }
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
index 20aca5f..b7000b4 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
@@ -20,8 +20,8 @@ package org.apache.beam.runners.spark.io;
import static com.google.common.base.Preconditions.checkArgument;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.spark.api.java.JavaSparkContext$;
@@ -58,7 +58,7 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
private static final Logger LOG = LoggerFactory.getLogger(SourceDStream.class);
private final UnboundedSource<T, CheckpointMarkT> unboundedSource;
- private final SparkRuntimeContext runtimeContext;
+ private final SerializablePipelineOptions options;
private final Duration boundReadDuration;
// Reader cache interval to expire readers if they haven't been accessed in the last microbatch.
// The reason we expire readers is that upon executor death/addition source split ownership can be
@@ -81,20 +81,20 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
SourceDStream(
StreamingContext ssc,
UnboundedSource<T, CheckpointMarkT> unboundedSource,
- SparkRuntimeContext runtimeContext,
+ SerializablePipelineOptions options,
Long boundMaxRecords) {
super(ssc, JavaSparkContext$.MODULE$.<scala.Tuple2<Source<T>, CheckpointMarkT>>fakeClassTag());
this.unboundedSource = unboundedSource;
- this.runtimeContext = runtimeContext;
+ this.options = options;
- SparkPipelineOptions options = runtimeContext.getPipelineOptions().as(
+ SparkPipelineOptions sparkOptions = options.get().as(
SparkPipelineOptions.class);
// Reader cache expiration interval. 50% of batch interval is added to accommodate latency.
- this.readerCacheInterval = 1.5 * options.getBatchIntervalMillis();
+ this.readerCacheInterval = 1.5 * sparkOptions.getBatchIntervalMillis();
- this.boundReadDuration = boundReadDuration(options.getReadTimePercentage(),
- options.getMinReadTimeMillis());
+ this.boundReadDuration = boundReadDuration(sparkOptions.getReadTimePercentage(),
+ sparkOptions.getMinReadTimeMillis());
// set initial parallelism once.
this.initialParallelism = ssc().sparkContext().defaultParallelism();
checkArgument(this.initialParallelism > 0, "Number of partitions must be greater than zero.");
@@ -104,7 +104,7 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
try {
this.numPartitions =
createMicrobatchSource()
- .split(options)
+ .split(sparkOptions)
.size();
} catch (Exception e) {
throw new RuntimeException(e);
@@ -116,7 +116,7 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
RDD<scala.Tuple2<Source<T>, CheckpointMarkT>> rdd =
new SourceRDD.Unbounded<>(
ssc().sparkContext(),
- runtimeContext,
+ options,
createMicrobatchSource(),
numPartitions);
return scala.Option.apply(rdd);
http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
index 01cc176..a225e0f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
@@ -28,9 +28,9 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
-import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
@@ -66,7 +66,7 @@ public class SourceRDD {
private static final Logger LOG = LoggerFactory.getLogger(SourceRDD.Bounded.class);
private final BoundedSource<T> source;
- private final SparkRuntimeContext runtimeContext;
+ private final SerializablePipelineOptions options;
private final int numPartitions;
private final String stepName;
private final Accumulator<MetricsContainerStepMap> metricsAccum;
@@ -79,11 +79,11 @@ public class SourceRDD {
public Bounded(
SparkContext sc,
BoundedSource<T> source,
- SparkRuntimeContext runtimeContext,
+ SerializablePipelineOptions options,
String stepName) {
super(sc, NIL, JavaSparkContext$.MODULE$.<WindowedValue<T>>fakeClassTag());
this.source = source;
- this.runtimeContext = runtimeContext;
+ this.options = options;
// the input parallelism is determined by Spark's scheduler backend.
// when running on YARN/SparkDeploy it's the result of max(totalCores, 2).
// when running on Mesos it's 8.
@@ -103,14 +103,14 @@ public class SourceRDD {
long desiredSizeBytes = DEFAULT_BUNDLE_SIZE;
try {
desiredSizeBytes = source.getEstimatedSizeBytes(
- runtimeContext.getPipelineOptions()) / numPartitions;
+ options.get()) / numPartitions;
} catch (Exception e) {
LOG.warn("Failed to get estimated bundle size for source {}, using default bundle "
+ "size of {} bytes.", source, DEFAULT_BUNDLE_SIZE);
}
try {
List<? extends Source<T>> partitionedSources = source.split(desiredSizeBytes,
- runtimeContext.getPipelineOptions());
+ options.get());
Partition[] partitions = new SourcePartition[partitionedSources.size()];
for (int i = 0; i < partitionedSources.size(); i++) {
partitions[i] = new SourcePartition<>(id(), i, partitionedSources.get(i));
@@ -125,7 +125,7 @@ public class SourceRDD {
private BoundedSource.BoundedReader<T> createReader(SourcePartition<T> partition) {
try {
return ((BoundedSource<T>) partition.source).createReader(
- runtimeContext.getPipelineOptions());
+ options.get());
} catch (IOException e) {
throw new RuntimeException("Failed to create reader from a BoundedSource.", e);
}
@@ -293,7 +293,7 @@ public class SourceRDD {
UnboundedSource.CheckpointMark> extends RDD<scala.Tuple2<Source<T>, CheckpointMarkT>> {
private final MicrobatchSource<T, CheckpointMarkT> microbatchSource;
- private final SparkRuntimeContext runtimeContext;
+ private final SerializablePipelineOptions options;
private final Partitioner partitioner;
// to satisfy Scala API.
@@ -302,12 +302,12 @@ public class SourceRDD {
.asScalaBuffer(Collections.<Dependency<?>>emptyList()).toList();
public Unbounded(SparkContext sc,
- SparkRuntimeContext runtimeContext,
+ SerializablePipelineOptions options,
MicrobatchSource<T, CheckpointMarkT> microbatchSource,
int initialNumPartitions) {
super(sc, NIL,
JavaSparkContext$.MODULE$.<scala.Tuple2<Source<T>, CheckpointMarkT>>fakeClassTag());
- this.runtimeContext = runtimeContext;
+ this.options = options;
this.microbatchSource = microbatchSource;
this.partitioner = new HashPartitioner(initialNumPartitions);
}
@@ -316,7 +316,7 @@ public class SourceRDD {
public Partition[] getPartitions() {
try {
final List<? extends Source<T>> partitionedSources =
- microbatchSource.split(runtimeContext.getPipelineOptions());
+ microbatchSource.split(options.get());
final Partition[] partitions = new CheckpointableSourcePartition[partitionedSources.size()];
for (int i = 0; i < partitionedSources.size(); i++) {
partitions[i] =
http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
index 7106c73..b31aa9f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
@@ -22,12 +22,12 @@ import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.stateful.StateSpecFunctions;
-import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks;
@@ -80,11 +80,11 @@ public class SparkUnboundedSource {
public static <T, CheckpointMarkT extends CheckpointMark> UnboundedDataset<T> read(
JavaStreamingContext jssc,
- SparkRuntimeContext rc,
+ SerializablePipelineOptions rc,
UnboundedSource<T, CheckpointMarkT> source,
String stepName) {
- SparkPipelineOptions options = rc.getPipelineOptions().as(SparkPipelineOptions.class);
+ SparkPipelineOptions options = rc.get().as(SparkPipelineOptions.class);
Long maxRecordsPerBatch = options.getMaxRecordsPerBatch();
SourceDStream<T, CheckpointMarkT> sourceDStream =
new SourceDStream<>(jssc.ssc(), source, rc, maxRecordsPerBatch);