You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/12 02:28:26 UTC
[08/39] incubator-beam git commit: BEAM-261 Enable checkstyle and
cleanup.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/package-info.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/package-info.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/package-info.java
new file mode 100644
index 0000000..ecb0adb
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of the Beam runner for Apache Apex.
+ */
+package org.apache.beam.runners.apex.translators.functions;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
index 6ee82ea..3188dfa 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
@@ -18,6 +18,17 @@
package org.apache.beam.runners.apex.translators.io;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.base.Throwables;
+
+import java.io.IOException;
+
import org.apache.beam.runners.apex.ApexPipelineOptions;
import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple.DataTuple;
@@ -26,27 +37,15 @@ import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
-
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.google.common.base.Throwables;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.common.util.BaseOperator;
-import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
-
-import java.io.IOException;
-
/**
* Apex input operator that wraps Beam {@link UnboundedSource}.
*/
-public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark>
- implements InputOperator {
+public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT
+ extends UnboundedSource.CheckpointMark> implements InputOperator {
private static final Logger LOG = LoggerFactory.getLogger(
ApexReadUnboundedInputOperator.class);
private boolean traceTuples = false;
@@ -58,10 +57,12 @@ public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT extends Unb
private final UnboundedSource<OutputT, CheckpointMarkT> source;
private transient UnboundedSource.UnboundedReader<OutputT> reader;
private transient boolean available = false;
- @OutputPortFieldAnnotation(optional=true)
- public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<OutputT>>> output = new DefaultOutputPort<>();
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<OutputT>>> output =
+ new DefaultOutputPort<>();
- public ApexReadUnboundedInputOperator(UnboundedSource<OutputT, CheckpointMarkT> source, ApexPipelineOptions options) {
+ public ApexReadUnboundedInputOperator(UnboundedSource<OutputT, CheckpointMarkT> source,
+ ApexPipelineOptions options) {
this.pipelineOptions = new SerializablePipelineOptions(options);
this.source = source;
}
@@ -72,8 +73,7 @@ public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT extends Unb
}
@Override
- public void beginWindow(long windowId)
- {
+ public void beginWindow(long windowId) {
if (!available && source instanceof ValuesSource) {
// if it's a Create and the input was consumed, emit final watermark
emitWatermarkIfNecessary(GlobalWindow.TIMESTAMP_MAX_VALUE.getMillis());
@@ -95,37 +95,33 @@ public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT extends Unb
}
@Override
- public void endWindow()
- {
+ public void endWindow() {
}
@Override
- public void setup(OperatorContext context)
- {
+ public void setup(OperatorContext context) {
this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(pipelineOptions.get(), this);
try {
reader = source.createReader(this.pipelineOptions.get(), null);
available = reader.start();
} catch (IOException e) {
- Throwables.propagate(e);
+ throw new RuntimeException(e);
}
}
@Override
- public void teardown()
- {
+ public void teardown() {
try {
if (reader != null) {
reader.close();
}
} catch (IOException e) {
- Throwables.propagate(e);
+ throw new RuntimeException(e);
}
}
@Override
- public void emitTuples()
- {
+ public void emitTuples() {
try {
if (!available) {
available = reader.advance();
@@ -141,7 +137,8 @@ public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT extends Unb
data, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)));
}
} catch (Exception e) {
- Throwables.propagate(e);
+ Throwables.propagateIfPossible(e);
+ throw new RuntimeException(e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java
index 2c4b298..fadf8ec4 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ValuesSource.java
@@ -18,16 +18,6 @@
package org.apache.beam.runners.apex.translators.io;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.coders.Coder.Context;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-import org.joda.time.Instant;
-
-import com.google.common.base.Throwables;
-
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -37,8 +27,15 @@ import java.util.NoSuchElementException;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.Context;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.joda.time.Instant;
+
/**
- * unbounded source that reads from a Java {@link Iterable}.
+ * Unbounded source that reads from a Java {@link Iterable}.
*/
public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.CheckpointMark> {
private static final long serialVersionUID = 1L;
@@ -52,7 +49,7 @@ public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.Checkpoi
try {
iterableCoder.encode(values, bos, Context.OUTER);
} catch (IOException ex) {
- Throwables.propagate(ex);
+ throw new RuntimeException(ex);
}
this.codedValues = bos.toByteArray();
}
@@ -71,7 +68,7 @@ public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.Checkpoi
Iterable<T> values = this.iterableCoder.decode(bis, Context.OUTER);
return new ValuesReader<>(values, this);
} catch (IOException ex) {
- throw Throwables.propagate(ex);
+ throw new RuntimeException(ex);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/package-info.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/package-info.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/package-info.java
new file mode 100644
index 0000000..0d17f19
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of the Beam runner for Apache Apex.
+ */
+package org.apache.beam.runners.apex.translators.io;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/package-info.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/package-info.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/package-info.java
new file mode 100644
index 0000000..7d7c6cc
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of the Beam runner for Apache Apex.
+ */
+package org.apache.beam.runners.apex.translators;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
index c9bf6dc..a260a66 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
@@ -19,6 +19,8 @@ package org.apache.beam.runners.apex.translators.utils;
import static com.google.common.base.Preconditions.checkNotNull;
+import com.datatorrent.api.Operator;
+
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -32,24 +34,25 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StandardCoder;
-import com.datatorrent.api.Operator;
-
-public interface ApexStreamTuple<T>
-{
+/**
+ * The common interface for all objects transmitted through streams.
+ *
+ * @param <T> The actual payload type.
+ */
+public interface ApexStreamTuple<T> {
/**
- * Gets the value of the tuple
+ * Gets the value of the tuple.
*
* @return
*/
T getValue();
/**
- * Plain tuple class
+ * Data tuple class.
*
* @param <T>
*/
- class DataTuple<T> implements ApexStreamTuple<T>
- {
+ class DataTuple<T> implements ApexStreamTuple<T> {
private int unionTag;
private T value;
@@ -57,86 +60,73 @@ public interface ApexStreamTuple<T>
return new DataTuple<>(value, 0);
}
- private DataTuple(T value, int unionTag)
- {
+ private DataTuple(T value, int unionTag) {
this.value = value;
this.unionTag = unionTag;
}
@Override
- public T getValue()
- {
+ public T getValue() {
return value;
}
- public void setValue(T value)
- {
+ public void setValue(T value) {
this.value = value;
}
- public int getUnionTag()
- {
+ public int getUnionTag() {
return unionTag;
}
- public void setUnionTag(int unionTag)
- {
+ public void setUnionTag(int unionTag) {
this.unionTag = unionTag;
}
@Override
- public String toString()
- {
+ public String toString() {
return value.toString();
}
}
/**
- * Tuple that includes a timestamp
+ * Tuple that includes a timestamp.
*
* @param <T>
*/
- class TimestampedTuple<T> extends DataTuple<T>
- {
+ class TimestampedTuple<T> extends DataTuple<T> {
private long timestamp;
- public TimestampedTuple(long timestamp, T value)
- {
+ public TimestampedTuple(long timestamp, T value) {
super(value, 0);
this.timestamp = timestamp;
}
- public long getTimestamp()
- {
+ public long getTimestamp() {
return timestamp;
}
- public void setTimestamp(long timestamp)
- {
+ public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
}
/**
- * Tuple that represents a watermark
+ * Tuple that represents a watermark.
*
* @param <T>
*/
- class WatermarkTuple<T> extends TimestampedTuple<T>
- {
+ class WatermarkTuple<T> extends TimestampedTuple<T> {
public static <T> WatermarkTuple<T> of(long timestamp) {
return new WatermarkTuple<>(timestamp);
}
- protected WatermarkTuple(long timestamp)
- {
+ protected WatermarkTuple(long timestamp) {
super(timestamp, null);
}
@Override
- public String toString()
- {
+ public String toString() {
return "[Watermark " + getTimestamp() + "]";
}
}
@@ -161,18 +151,17 @@ public interface ApexStreamTuple<T>
throws CoderException, IOException {
if (value instanceof WatermarkTuple) {
outStream.write(1);
- new DataOutputStream(outStream).writeLong(((WatermarkTuple<?>)value).getTimestamp());
+ new DataOutputStream(outStream).writeLong(((WatermarkTuple<?>) value).getTimestamp());
} else {
outStream.write(0);
- outStream.write(((DataTuple<?>)value).unionTag);
+ outStream.write(((DataTuple<?>) value).unionTag);
valueCoder.encode(value.getValue(), outStream, context);
}
}
@Override
public ApexStreamTuple<T> decode(InputStream inStream, Context context)
- throws CoderException, IOException
- {
+ throws CoderException, IOException {
int b = inStream.read();
if (b == 1) {
return new WatermarkTuple<T>(new DataInputStream(inStream).readLong());
@@ -183,14 +172,12 @@ public interface ApexStreamTuple<T>
}
@Override
- public List<? extends Coder<?>> getCoderArguments()
- {
+ public List<? extends Coder<?>> getCoderArguments() {
return Arrays.<Coder<?>>asList(valueCoder);
}
@Override
- public void verifyDeterministic() throws NonDeterministicException
- {
+ public void verifyDeterministic() throws NonDeterministicException {
verifyDeterministic(
this.getClass().getSimpleName() + " requires a deterministic valueCoder",
valueCoder);
@@ -205,10 +192,12 @@ public interface ApexStreamTuple<T>
}
- final class Logging
- {
- public static boolean isDebugEnabled(ApexPipelineOptions options, Operator operator)
- {
+ /**
+ * Central if data tuples received on and emitted from ports should be logged.
+ * Should be called in setup and value cached in operator.
+ */
+ final class Logging {
+ public static boolean isDebugEnabled(ApexPipelineOptions options, Operator operator) {
return options.isTupleTracingEnabled();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/CoderAdapterStreamCodec.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/CoderAdapterStreamCodec.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/CoderAdapterStreamCodec.java
index c18765b..61e3b83 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/CoderAdapterStreamCodec.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/CoderAdapterStreamCodec.java
@@ -17,6 +17,9 @@
*/
package org.apache.beam.runners.apex.translators.utils;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.netlet.util.Slice;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -25,15 +28,10 @@ import java.io.Serializable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.Context;
-import com.datatorrent.api.StreamCodec;
-import com.datatorrent.netlet.util.Slice;
-import com.google.common.base.Throwables;
-
/**
* The Apex {@link StreamCodec} adapter for using Beam {@link Coder}.
*/
public class CoderAdapterStreamCodec implements StreamCodec<Object>, Serializable {
-
private static final long serialVersionUID = 1L;
private final Coder<? super Object> coder;
@@ -42,31 +40,29 @@ public class CoderAdapterStreamCodec implements StreamCodec<Object>, Serializabl
}
@Override
- public Object fromByteArray(Slice fragment)
- {
- ByteArrayInputStream bis = new ByteArrayInputStream(fragment.buffer, fragment.offset, fragment.length);
+ public Object fromByteArray(Slice fragment) {
+ ByteArrayInputStream bis = new ByteArrayInputStream(fragment.buffer, fragment.offset,
+ fragment.length);
try {
return coder.decode(bis, Context.OUTER);
} catch (IOException e) {
- throw Throwables.propagate(e);
+ throw new RuntimeException(e);
}
}
@Override
- public Slice toByteArray(Object wv)
- {
+ public Slice toByteArray(Object wv) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try {
coder.encode(wv, bos, Context.OUTER);
} catch (IOException e) {
- Throwables.propagate(e);
+ throw new RuntimeException(e);
}
return new Slice(bos.toByteArray());
}
@Override
- public int getPartition(Object o)
- {
+ public int getPartition(Object o) {
return o.hashCode();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpStepContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpStepContext.java
index 43d92f6..3b19c37 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpStepContext.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpStepContext.java
@@ -17,6 +17,9 @@
*/
package org.apache.beam.runners.apex.translators.utils;
+import java.io.IOException;
+import java.io.Serializable;
+
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.ExecutionContext;
@@ -25,14 +28,10 @@ import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.values.TupleTag;
-import java.io.IOException;
-import java.io.Serializable;
-
/**
* Serializable {@link ExecutionContext.StepContext} that does nothing.
*/
public class NoOpStepContext implements ExecutionContext.StepContext, Serializable {
-
private static final long serialVersionUID = 1L;
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/SerializablePipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/SerializablePipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/SerializablePipelineOptions.java
index 7f7b3ef..d32b869 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/SerializablePipelineOptions.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/SerializablePipelineOptions.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.apex.translators.utils;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
@@ -25,37 +27,34 @@ import java.io.ObjectOutput;
import org.apache.beam.runners.apex.ApexPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
/**
- * A wrapper to enable serialization of {@link PipelineOptions}
+ * A wrapper to enable serialization of {@link PipelineOptions}.
*/
public class SerializablePipelineOptions implements Externalizable {
private transient ApexPipelineOptions pipelineOptions;
-
+
public SerializablePipelineOptions(ApexPipelineOptions pipelineOptions) {
this.pipelineOptions = pipelineOptions;
}
public SerializablePipelineOptions() {
}
-
+
public ApexPipelineOptions get() {
return this.pipelineOptions;
}
-
+
@Override
- public void writeExternal(ObjectOutput out) throws IOException
- {
+ public void writeExternal(ObjectOutput out) throws IOException {
out.writeUTF(new ObjectMapper().writeValueAsString(pipelineOptions));
}
@Override
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
- {
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
String s = in.readUTF();
- this.pipelineOptions = new ObjectMapper().readValue(s, PipelineOptions.class).as(ApexPipelineOptions.class);
+ this.pipelineOptions = new ObjectMapper().readValue(s, PipelineOptions.class)
+ .as(ApexPipelineOptions.class);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java
index 2de737d..c06c500 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java
@@ -17,26 +17,24 @@
*/
package org.apache.beam.runners.apex.translators.utils;
-import java.io.IOException;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.Coder.Context;
-
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoSerializable;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
-import com.google.common.base.Throwables;
+
+import java.io.IOException;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.Context;
/**
* A {@link KryoSerializable} holder that uses the specified {@link Coder}.
* @param <T>
*/
-public class ValueAndCoderKryoSerializable<T> implements KryoSerializable
-{
- private static JavaSerializer JAVA_SERIALIZER = new JavaSerializer();
+public class ValueAndCoderKryoSerializable<T> implements KryoSerializable {
+ private static final JavaSerializer JAVA_SERIALIZER = new JavaSerializer();
private T value;
private Coder<T> coder;
@@ -54,27 +52,25 @@ public class ValueAndCoderKryoSerializable<T> implements KryoSerializable
}
@Override
- public void write(Kryo kryo, Output output)
- {
+ public void write(Kryo kryo, Output output) {
try {
kryo.writeClass(output, coder.getClass());
kryo.writeObject(output, coder, JAVA_SERIALIZER);
coder.encode(value, output, Context.OUTER);
} catch (IOException e) {
- Throwables.propagate(e);
+ throw new RuntimeException(e);
}
}
@Override
- public void read(Kryo kryo, Input input)
- {
+ public void read(Kryo kryo, Input input) {
try {
@SuppressWarnings("unchecked")
Class<Coder<T>> type = kryo.readClass(input).getType();
coder = kryo.readObject(input, type, JAVA_SERIALIZER);
value = coder.decode(input, Context.OUTER);
} catch (IOException e) {
- Throwables.propagate(e);
+ throw new RuntimeException(e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/package-info.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/package-info.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/package-info.java
new file mode 100644
index 0000000..4aeba35
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of the Beam runner for Apache Apex.
+ */
+package org.apache.beam.runners.apex.translators.utils;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTest.java
deleted file mode 100644
index 3573d31..0000000
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTest.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.apex.examples;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.runners.apex.TestApexRunner;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.dataflow.TestCountingSource;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * For debugging only.
- */
-@Ignore
-@RunWith(JUnit4.class)
-public class IntTest implements java.io.Serializable
-{
-
- @Test
- public void test()
- {
- ApexPipelineOptions options = PipelineOptionsFactory.as(ApexPipelineOptions.class);
- options.setTupleTracingEnabled(true);
- options.setRunner(TestApexRunner.class);
- Pipeline p = Pipeline.create(options);
-boolean timeBound = false;
-
-
- TestCountingSource source = new TestCountingSource(Integer.MAX_VALUE).withoutSplitting();
-//List<KV<Integer,Integer>> values = Lists.newArrayList(
-// KV.of(0, 99),KV.of(0, 99),KV.of(0, 98));
-
-//UnboundedSource<KV<Integer,Integer>, ?> source = new ValuesSource<>(values,
-// KvCoder.of(VarIntCoder.of(), VarIntCoder.of()));
-
- if (true) {
- source = source.withDedup();
- }
-
- PCollection<KV<Integer, Integer>> output =
- timeBound
- ? p.apply(Read.from(source).withMaxReadTime(Duration.millis(200)))
- : p.apply(Read.from(source).withMaxNumRecords(NUM_RECORDS));
-
- List<KV<Integer, Integer>> expectedOutput = new ArrayList<>();
- for (int i = 0; i < NUM_RECORDS; i++) {
- expectedOutput.add(KV.of(0, i));
- }
-
- // Because some of the NUM_RECORDS elements read are dupes, the final output
- // will only have output from 0 to n where n < NUM_RECORDS.
- PAssert.that(output).satisfies(new Checker(true, timeBound));
-
-
- p.run();
- return;
- }
-
- private static final int NUM_RECORDS = 10;
- private static class Checker implements SerializableFunction<Iterable<KV<Integer, Integer>>, Void>
- {
- private final boolean dedup;
- private final boolean timeBound;
-
- Checker(boolean dedup, boolean timeBound)
- {
- this.dedup = dedup;
- this.timeBound = timeBound;
- }
-
- @Override
- public Void apply(Iterable<KV<Integer, Integer>> input)
- {
- List<Integer> values = new ArrayList<>();
- for (KV<Integer, Integer> kv : input) {
- assertEquals(0, (int)kv.getKey());
- values.add(kv.getValue());
- }
- if (timeBound) {
- assertTrue(values.size() >= 1);
- } else if (dedup) {
- // Verify that at least some data came through. The chance of 90% of the input
- // being duplicates is essentially zero.
- assertTrue(values.size() > NUM_RECORDS / 10 && values.size() <= NUM_RECORDS);
- } else {
- assertEquals(NUM_RECORDS, values.size());
- }
- Collections.sort(values);
- for (int i = 0; i < values.size(); i++) {
- assertEquals(i, (int)values.get(i));
- }
- //if (finalizeTracker != null) {
- // assertThat(finalizeTracker, containsInAnyOrder(values.size() - 1));
- //}
- return null;
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java
index 582d839..6ab2e8e 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java
@@ -21,8 +21,8 @@ package org.apache.beam.runners.apex.examples;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.runners.apex.ApexRunnerResult;
import org.apache.beam.runners.apex.ApexRunner;
+import org.apache.beam.runners.apex.ApexRunnerResult;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -37,7 +37,6 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.junit.Assert;
-import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,8 +75,8 @@ public class StreamingWordCountTest {
@ProcessElement
public void processElement(ProcessContext c) {
- String row = c.element().getKey() + " - " + c.element().getValue() +
- " @ " + c.timestamp().toString();
+ String row = c.element().getKey() + " - " + c.element().getValue()
+ + " @ " + c.timestamp().toString();
LOG.debug("output {}", row);
c.output(row);
RESULTS.put(c.element().getKey(), c.element().getValue());
@@ -103,17 +102,19 @@ public class StreamingWordCountTest {
wordCounts.apply(ParDo.of(new FormatAsStringFn()));
- ApexRunnerResult result = (ApexRunnerResult)p.run();
+ ApexRunnerResult result = (ApexRunnerResult) p.run();
Assert.assertNotNull(result.getApexDAG().getOperatorMeta("Read(UnboundedTextSource)"));
long timeout = System.currentTimeMillis() + 30000;
while (System.currentTimeMillis() < timeout) {
- if (FormatAsStringFn.RESULTS.containsKey("foo") && FormatAsStringFn.RESULTS.containsKey("bar")) {
+ if (FormatAsStringFn.RESULTS.containsKey("foo")
+ && FormatAsStringFn.RESULTS.containsKey("bar")) {
break;
}
Thread.sleep(1000);
}
result.cancel();
- Assert.assertTrue(FormatAsStringFn.RESULTS.containsKey("foo") && FormatAsStringFn.RESULTS.containsKey("bar"));
+ Assert.assertTrue(
+ FormatAsStringFn.RESULTS.containsKey("foo") && FormatAsStringFn.RESULTS.containsKey("bar"));
FormatAsStringFn.RESULTS.clear();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
index 29351e9..8132ee5 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
@@ -18,14 +18,6 @@
package org.apache.beam.runners.apex.examples;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.joda.time.Instant;
-
-import com.google.common.base.Throwables;
-
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
@@ -34,6 +26,12 @@ import java.util.NoSuchElementException;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.joda.time.Instant;
+
/**
* unbounded source that reads from text.
*/
@@ -102,7 +100,7 @@ public class UnboundedTextSource extends UnboundedSource<String, UnboundedSource
try {
Thread.sleep(index); // allow for downstream processing to complete
} catch (InterruptedException e) {
- Throwables.propagate(e);
+ throw new RuntimeException(e);
}
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/package-info.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/package-info.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/package-info.java
new file mode 100644
index 0000000..4308c80
--- /dev/null
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of the Beam runner for Apache Apex.
+ */
+package org.apache.beam.runners.apex.examples;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslatorTest.java
index 6b181ba..7defc77 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslatorTest.java
@@ -18,9 +18,16 @@
package org.apache.beam.runners.apex.translators;
+import com.google.common.collect.Sets;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.runners.apex.ApexRunnerResult;
import org.apache.beam.runners.apex.ApexRunner;
+import org.apache.beam.runners.apex.ApexRunnerResult;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -30,21 +37,13 @@ import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
-
-import com.google.common.collect.Sets;
-
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-
/**
- * integration test for {@link FlattenPCollectionTranslator}.
+ * Integration test for {@link FlattenPCollectionTranslator}.
*/
public class FlattenPCollectionTranslatorTest {
private static final Logger LOG = LoggerFactory.getLogger(FlattenPCollectionTranslatorTest.class);
@@ -70,29 +69,30 @@ public class FlattenPCollectionTranslatorTest {
PCollection<String> actual = PCollectionList.of(pcList).apply(Flatten.<String>pCollections());
actual.apply(ParDo.of(new EmbeddedCollector()));
- ApexRunnerResult result = (ApexRunnerResult)p.run();
+ ApexRunnerResult result = (ApexRunnerResult) p.run();
// TODO: verify translation
result.getApexDAG();
long timeout = System.currentTimeMillis() + 30000;
- while (System.currentTimeMillis() < timeout && EmbeddedCollector.results.size() < expected.size()) {
+ while (System.currentTimeMillis() < timeout
+ && EmbeddedCollector.RESULTS.size() < expected.size()) {
LOG.info("Waiting for expected results.");
Thread.sleep(500);
}
- Assert.assertEquals("number results", expected.size(), EmbeddedCollector.results.size());
- Assert.assertEquals(expected, Sets.newHashSet(EmbeddedCollector.results));
+ Assert.assertEquals("number results", expected.size(), EmbeddedCollector.RESULTS.size());
+ Assert.assertEquals(expected, Sets.newHashSet(EmbeddedCollector.RESULTS));
}
@SuppressWarnings("serial")
private static class EmbeddedCollector extends OldDoFn<Object, Void> {
- protected static final ArrayList<Object> results = new ArrayList<>();
+ protected static final ArrayList<Object> RESULTS = new ArrayList<>();
public EmbeddedCollector() {
}
@Override
public void processElement(ProcessContext c) throws Exception {
- results.add(c.element());
+ RESULTS.add(c.element());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java
index e4d4606..cb764d6 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslatorTest.java
@@ -18,9 +18,22 @@
package org.apache.beam.runners.apex.translators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import javax.annotation.Nullable;
+
import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.runners.apex.ApexRunnerResult;
import org.apache.beam.runners.apex.ApexRunner;
+import org.apache.beam.runners.apex.ApexRunnerResult;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -35,28 +48,13 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
-
-import com.datatorrent.api.DAG;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import javax.annotation.Nullable;
-
/**
- * integration test for {@link GroupByKeyTranslator}.
+ * Integration test for {@link GroupByKeyTranslator}.
*/
public class GroupByKeyTranslatorTest {
@@ -94,31 +92,30 @@ public class GroupByKeyTranslatorTest {
.apply(ParDo.of(new EmbeddedCollector()))
;
- ApexRunnerResult result = (ApexRunnerResult)p.run();
- // TODO: verify translation
- DAG dag = result.getApexDAG();
+ ApexRunnerResult result = (ApexRunnerResult) p.run();
+ result.getApexDAG();
long timeout = System.currentTimeMillis() + 30000;
while (System.currentTimeMillis() < timeout) {
- if (EmbeddedCollector.results.containsAll(expected)) {
+ if (EmbeddedCollector.RESULTS.containsAll(expected)) {
break;
}
Thread.sleep(1000);
}
- Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.results);
+ Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS);
}
@SuppressWarnings("serial")
private static class EmbeddedCollector extends OldDoFn<Object, Void> {
- protected static final HashSet<Object> results = new HashSet<>();
+ protected static final HashSet<Object> RESULTS = new HashSet<>();
public EmbeddedCollector() {
}
@Override
public void processElement(ProcessContext c) throws Exception {
- results.add(c.element());
+ RESULTS.add(c.element());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
index b9748ee..ad22acd 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
@@ -21,6 +21,11 @@ package org.apache.beam.runners.apex.translators;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.util.KryoCloneUtils;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -56,11 +61,6 @@ import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.datatorrent.api.DAG;
-import com.datatorrent.lib.util.KryoCloneUtils;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
/**
* integration test for {@link ParDoBoundTranslator}.
*/
@@ -83,7 +83,7 @@ public class ParDoBoundTranslatorTest {
.apply(ParDo.of(new Add(5)))
.apply(ParDo.of(new EmbeddedCollector()));
- ApexRunnerResult result = (ApexRunnerResult)p.run();
+ ApexRunnerResult result = (ApexRunnerResult) p.run();
DAG dag = result.getApexDAG();
DAG.OperatorMeta om = dag.getOperatorMeta("Create.Values");
@@ -96,13 +96,13 @@ public class ParDoBoundTranslatorTest {
long timeout = System.currentTimeMillis() + 30000;
while (System.currentTimeMillis() < timeout) {
- if (EmbeddedCollector.results.containsAll(expected)) {
+ if (EmbeddedCollector.RESULTS.containsAll(expected)) {
break;
}
LOG.info("Waiting for expected results.");
Thread.sleep(1000);
}
- Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.results);
+ Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS);
}
@SuppressWarnings("serial")
@@ -121,14 +121,14 @@ public class ParDoBoundTranslatorTest {
@SuppressWarnings("serial")
private static class EmbeddedCollector extends OldDoFn<Object, Void> {
- protected static final HashSet<Object> results = new HashSet<>();
+ protected static final HashSet<Object> RESULTS = new HashSet<>();
public EmbeddedCollector() {
}
@Override
public void processElement(ProcessContext c) throws Exception {
- results.add(c.element());
+ RESULTS.add(c.element());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java
index f954537..71c5354 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java
@@ -18,9 +18,20 @@
package org.apache.beam.runners.apex.translators;
+import com.datatorrent.api.DAG;
+import com.google.common.collect.ContiguousSet;
+import com.google.common.collect.DiscreteDomain;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.runners.apex.ApexRunnerResult;
import org.apache.beam.runners.apex.ApexRunner;
+import org.apache.beam.runners.apex.ApexRunnerResult;
import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator;
import org.apache.beam.runners.apex.translators.utils.CollectionSource;
import org.apache.beam.sdk.Pipeline;
@@ -30,23 +41,11 @@ import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
-
-import com.datatorrent.api.DAG;
-import com.google.common.collect.ContiguousSet;
-import com.google.common.collect.DiscreteDomain;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Range;
-import com.google.common.collect.Sets;
-
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
/**
* integration test for {@link ReadUnboundedTranslator}.
*/
@@ -57,7 +56,7 @@ public class ReadUnboundTranslatorTest {
public void test() throws Exception {
ApexPipelineOptions options = PipelineOptionsFactory.create()
.as(ApexPipelineOptions.class);
- EmbeddedCollector.results.clear();
+ EmbeddedCollector.RESULTS.clear();
options.setApplicationName("ReadUnbound");
options.setRunner(ApexRunner.class);
Pipeline p = Pipeline.create(options);
@@ -67,7 +66,7 @@ public class ReadUnboundTranslatorTest {
p.apply(Read.from(source))
.apply(ParDo.of(new EmbeddedCollector()));
- ApexRunnerResult result = (ApexRunnerResult)p.run();
+ ApexRunnerResult result = (ApexRunnerResult) p.run();
DAG dag = result.getApexDAG();
DAG.OperatorMeta om = dag.getOperatorMeta("Read(CollectionSource)");
Assert.assertNotNull(om);
@@ -75,20 +74,20 @@ public class ReadUnboundTranslatorTest {
long timeout = System.currentTimeMillis() + 30000;
while (System.currentTimeMillis() < timeout) {
- if (EmbeddedCollector.results.containsAll(collection)) {
+ if (EmbeddedCollector.RESULTS.containsAll(collection)) {
break;
}
LOG.info("Waiting for expected results.");
Thread.sleep(1000);
}
- Assert.assertEquals(Sets.newHashSet(collection), EmbeddedCollector.results);
+ Assert.assertEquals(Sets.newHashSet(collection), EmbeddedCollector.RESULTS);
}
@Test
public void testReadBounded() throws Exception {
ApexPipelineOptions options = PipelineOptionsFactory.create()
.as(ApexPipelineOptions.class);
- EmbeddedCollector.results.clear();
+ EmbeddedCollector.RESULTS.clear();
options.setApplicationName("ReadBounded");
options.setRunner(ApexRunner.class);
Pipeline p = Pipeline.create(options);
@@ -97,7 +96,7 @@ public class ReadUnboundTranslatorTest {
p.apply(Read.from(CountingSource.upTo(10)))
.apply(ParDo.of(new EmbeddedCollector()));
- ApexRunnerResult result = (ApexRunnerResult)p.run();
+ ApexRunnerResult result = (ApexRunnerResult) p.run();
DAG dag = result.getApexDAG();
DAG.OperatorMeta om = dag.getOperatorMeta("Read(BoundedCountingSource)");
Assert.assertNotNull(om);
@@ -105,25 +104,25 @@ public class ReadUnboundTranslatorTest {
long timeout = System.currentTimeMillis() + 30000;
while (System.currentTimeMillis() < timeout) {
- if (EmbeddedCollector.results.containsAll(expected)) {
+ if (EmbeddedCollector.RESULTS.containsAll(expected)) {
break;
}
LOG.info("Waiting for expected results.");
Thread.sleep(1000);
}
- Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.results);
+ Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS);
}
@SuppressWarnings("serial")
private static class EmbeddedCollector extends OldDoFn<Object, Void> {
- protected static final HashSet<Object> results = new HashSet<>();
+ protected static final HashSet<Object> RESULTS = new HashSet<>();
public EmbeddedCollector() {
}
@Override
public void processElement(ProcessContext c) throws Exception {
- results.add(c.element());
+ RESULTS.add(c.element());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/CollectionSource.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/CollectionSource.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/CollectionSource.java
index a1e8b3e..c368bb2 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/CollectionSource.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/CollectionSource.java
@@ -18,12 +18,6 @@
package org.apache.beam.runners.apex.translators.utils;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-import org.joda.time.Instant;
-
import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
@@ -34,11 +28,16 @@ import java.util.NoSuchElementException;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.joda.time.Instant;
+
/**
* collection as {@link UnboundedSource}, used for tests.
*/
public class CollectionSource<T> extends UnboundedSource<T, UnboundedSource.CheckpointMark> {
-
+ private static final long serialVersionUID = 1L;
private final Collection<T> collection;
private final Coder<T> coder;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/PipelineOptionsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/PipelineOptionsTest.java
index e2fa9d9..e67efa9 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/PipelineOptionsTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/utils/PipelineOptionsTest.java
@@ -17,29 +17,31 @@
*/
package org.apache.beam.runners.apex.translators.utils;
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import com.datatorrent.common.util.FSStorageAgent;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
/**
* Tests the serialization of PipelineOptions.
*/
public class PipelineOptionsTest {
+ /**
+ * Interface for testing.
+ */
public interface MyOptions extends ApexPipelineOptions {
@Description("Bla bla bla")
@Default.String("Hello")
@@ -60,7 +62,7 @@ public class PipelineOptionsTest {
private static MyOptions options;
- private final static String[] args = new String[]{"--testOption=nothing"};
+ private static final String[] args = new String[]{"--testOption=nothing"};
@BeforeClass
public static void beforeTest() {
@@ -74,7 +76,7 @@ public class PipelineOptionsTest {
FSStorageAgent.store(bos, wrapper);
ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
- MyOptionsWrapper wrapperCopy = (MyOptionsWrapper)FSStorageAgent.retrieve(bis);
+ MyOptionsWrapper wrapperCopy = (MyOptionsWrapper) FSStorageAgent.retrieve(bis);
assertNotNull(wrapperCopy.options);
assertEquals("nothing", wrapperCopy.options.get().as(MyOptions.class).getTestOption());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/resources/log4j.properties b/runners/apex/src/test/resources/log4j.properties
index c0efc5d..d1e6b44 100644
--- a/runners/apex/src/test/resources/log4j.properties
+++ b/runners/apex/src/test/resources/log4j.properties
@@ -18,16 +18,18 @@
# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
-log4j.rootLogger=DEBUG, testlogger
+log4j.rootLogger=OFF, testlogger
# A1 is set to be a ConsoleAppender.
log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
log4j.appender.testlogger.target = System.err
log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+log4j.appender.testlogger.threshold=${test.log.threshold}
+test.log.threshold=DEBUG
-log4j.logger.org=debug
+log4j.logger.org=info
log4j.logger.org.apache.commons.beanutils=warn
log4j.logger.com.datatorrent=info
-log4j.logger.org.apache.apex=debug
+log4j.logger.org.apache.apex=info
log4j.logger.org.apache.beam.runners.apex=debug