You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/04/26 03:46:29 UTC
[2/2] incubator-beam git commit: Register DisplayData from composite
IO transforms
Register DisplayData from composite IO transforms
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/15b11ede
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/15b11ede
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/15b11ede
Branch: refs/heads/master
Commit: 15b11edef1e24fbffaa37b57494746d53522e04e
Parents: ebc7035
Author: Scott Wegner <sw...@google.com>
Authored: Thu Apr 21 14:04:36 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Apr 25 18:46:13 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroIO.java | 25 ++++++++++-
.../java/org/apache/beam/sdk/io/BigQueryIO.java | 34 ++++++++++++++-
.../org/apache/beam/sdk/io/CountingInput.java | 22 ++++++++++
.../java/org/apache/beam/sdk/io/PubsubIO.java | 33 ++++++++++++++
.../java/org/apache/beam/sdk/io/TextIO.java | 27 +++++++++++-
.../apache/beam/sdk/io/bigtable/BigtableIO.java | 27 ++++++++++++
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 33 ++++++++++++++
.../org/apache/beam/sdk/io/BigQueryIOTest.java | 46 ++++++++++++++++++++
.../apache/beam/sdk/io/CountingInputTest.java | 32 ++++++++++++++
.../org/apache/beam/sdk/io/PubsubIOTest.java | 44 +++++++++++++++++++
.../java/org/apache/beam/sdk/io/TextIOTest.java | 34 +++++++++++++++
.../beam/sdk/io/bigtable/BigtableIOTest.java | 39 ++++++++++++++++-
12 files changed, 392 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15b11ede/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 8f76ae8..c706fc1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.io.Read.Bounded;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.values.PCollection;
@@ -327,6 +328,14 @@ public class AvroIO {
}
@Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder
+ .addIfNotNull("filePattern", filepattern)
+ .addIfNotDefault("validation", validate, true);
+ }
+
+ @Override
protected Coder<T> getDefaultOutputCoder() {
return AvroCoder.of(type, schema);
}
@@ -467,6 +476,8 @@ public class AvroIO {
* @param <T> the type of each of the elements of the input PCollection
*/
public static class Bound<T> extends PTransform<PCollection<T>, PDone> {
+ private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
+
/** The filename to write to. */
@Nullable
final String filenamePrefix;
@@ -485,7 +496,7 @@ public class AvroIO {
final boolean validate;
Bound(Class<T> type) {
- this(null, null, "", 0, ShardNameTemplate.INDEX_OF_MAX, type, null, true);
+ this(null, null, "", 0, DEFAULT_SHARD_TEMPLATE, type, null, true);
}
Bound(
@@ -679,6 +690,18 @@ public class AvroIO {
filenamePrefix, filenameSuffix, shardTemplate, AvroCoder.of(type, schema))));
}
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder
+ .add("schema", type)
+ .addIfNotNull("filePrefix", filenamePrefix)
+ .addIfNotDefault("shardNameTemplate", shardTemplate, DEFAULT_SHARD_TEMPLATE)
+ .addIfNotDefault("fileSuffix", filenameSuffix, "")
+ .addIfNotDefault("numShards", numShards, 0)
+ .addIfNotDefault("validation", validate, true);
+ }
+
/**
* Returns the current shard name template string.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15b11ede/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
index 9239514..1d5b993 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
@@ -44,6 +44,7 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.BigQueryServices;
import org.apache.beam.sdk.util.BigQueryServices.LoadService;
@@ -80,7 +81,6 @@ import com.google.common.collect.Lists;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -517,6 +517,20 @@ public class BigQueryIO {
return TableRowJsonCoder.of();
}
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+
+ if (table != null) {
+ builder.add("table", toTableSpec(table));
+ }
+
+ builder
+ .addIfNotNull("query", query)
+ .addIfNotNull("flattenResults", flattenResults)
+ .addIfNotDefault("validation", validate, true);
+ }
+
static {
DirectPipelineRunner.registerDefaultTransformEvaluator(
Bound.class, new DirectPipelineRunner.TransformEvaluator<Bound>() {
@@ -1048,6 +1062,24 @@ public class BigQueryIO {
return VoidCoder.of();
}
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+
+ builder
+ .addIfNotNull("table", jsonTableRef)
+ .addIfNotNull("schema", jsonSchema);
+
+ if (tableRefFunction != null) {
+ builder.add("tableFn", tableRefFunction.getClass());
+ }
+
+ builder
+ .add("createDisposition", createDisposition.toString())
+ .add("writeDisposition", writeDisposition.toString())
+ .addIfNotDefault("validation", validate, true);
+ }
+
/** Returns the create disposition. */
public CreateDisposition getCreateDisposition() {
return createDisposition;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15b11ede/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
index 55e6d96..f6b1256 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
@@ -24,6 +24,7 @@ import org.apache.beam.sdk.io.CountingSource.NowTimestampFn;
import org.apache.beam.sdk.io.Read.Unbounded;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
@@ -114,6 +115,12 @@ public class CountingInput {
public PCollection<Long> apply(PBegin begin) {
return begin.apply(Read.from(CountingSource.upTo(numElements)));
}
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder.add("upTo", numElements);
+ }
}
/**
@@ -221,5 +228,20 @@ public class CountingInput {
read.withMaxReadTime(maxReadTime.get()).withMaxNumRecords(maxNumRecords.get()));
}
}
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+
+ builder.add("timestampFn", timestampFn.getClass());
+
+ if (maxReadTime.isPresent()) {
+ builder.add("maxReadTime", maxReadTime.get());
+ }
+
+ if (maxNumRecords.isPresent()) {
+ builder.add("maxRecords", maxNumRecords.get());
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15b11ede/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
index 970fb51..1b0ec4a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.Transport;
@@ -689,6 +690,25 @@ public class PubsubIO {
}
@Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+
+ builder
+ .addIfNotNull("timestampLabel", timestampLabel)
+ .addIfNotNull("idLabel", idLabel)
+ .addIfNotNull("maxReadTime", maxReadTime)
+ .addIfNotDefault("maxNumRecords", maxNumRecords, 0);
+
+ if (topic != null) {
+ builder.add("topic", topic.asPath());
+ }
+
+ if (subscription != null) {
+ builder.add("subscription", subscription.asPath());
+ }
+ }
+
+ @Override
protected Coder<T> getDefaultOutputCoder() {
return coder;
}
@@ -974,6 +994,19 @@ public class PubsubIO {
}
@Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+
+ builder
+ .addIfNotNull("timestampLabel", timestampLabel)
+ .addIfNotNull("idLabel", idLabel);
+
+ if (topic != null) {
+ builder.add("topic", topic.asPath());
+ }
+ }
+
+ @Override
protected Coder<Void> getDefaultOutputCoder() {
return VoidCoder.of();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15b11ede/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index b95fe15..3882ee1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.io.Read.Bounded;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.DirectPipelineRunner;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.values.PCollection;
@@ -339,6 +340,16 @@ public class TextIO {
}
@Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+
+ builder
+ .add("compressionType", compressionType.toString())
+ .addIfNotDefault("validation", validate, true)
+ .addIfNotNull("filePattern", filepattern);
+ }
+
+ @Override
protected Coder<T> getDefaultOutputCoder() {
return coder;
}
@@ -467,6 +478,8 @@ public class TextIO {
* @param <T> the type of the elements of the input PCollection
*/
public static class Bound<T> extends PTransform<PCollection<T>, PDone> {
+ private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
+
/** The prefix of each file written, combined with suffix and shardTemplate. */
@Nullable private final String filenamePrefix;
/** The suffix of each file written, combined with prefix and shardTemplate. */
@@ -485,7 +498,7 @@ public class TextIO {
private final boolean validate;
Bound(Coder<T> coder) {
- this(null, null, "", coder, 0, ShardNameTemplate.INDEX_OF_MAX, true);
+ this(null, null, "", coder, 0, DEFAULT_SHARD_TEMPLATE, true);
}
private Bound(String name, String filenamePrefix, String filenameSuffix, Coder<T> coder,
@@ -631,6 +644,18 @@ public class TextIO {
filenamePrefix, filenameSuffix, shardTemplate, coder)));
}
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+
+ builder
+ .addIfNotNull("filePrefix", filenamePrefix)
+ .addIfNotDefault("fileSuffix", filenameSuffix, "")
+ .addIfNotDefault("shardNameTemplate", shardTemplate, DEFAULT_SHARD_TEMPLATE)
+ .addIfNotDefault("validation", validate, true)
+ .addIfNotDefault("numShards", numShards, 0);
+ }
+
/**
* Returns the current shard name template string.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15b11ede/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableIO.java
index 5177262..28ff335 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/bigtable/BigtableIO.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.io.range.ByteKeyRangeTracker;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
@@ -262,6 +263,21 @@ public class BigtableIO {
}
@Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+
+ builder.add("tableId", tableId);
+
+ if (options != null) {
+ builder.add("bigtableOptions", options.toString());
+ }
+
+ if (filter != null) {
+ builder.add("rowFilter", filter.toString());
+ }
+ }
+
+ @Override
public String toString() {
return MoreObjects.toStringHelper(Read.class)
.add("options", options)
@@ -429,6 +445,17 @@ public class BigtableIO {
}
@Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+
+ builder.add("tableId", tableId);
+
+ if (options != null) {
+ builder.add("bigtableOptions", options.toString());
+ }
+ }
+
+ @Override
public String toString() {
return MoreObjects.toStringHelper(Write.class)
.add("options", options)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15b11ede/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 57312c0..43b1219 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.io;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -29,6 +31,7 @@ import org.apache.beam.sdk.io.AvroIO.Write.Bound;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.values.PCollection;
@@ -256,4 +259,34 @@ public class AvroIOTest {
}
// TODO: for Write only, test withSuffix,
// withShardNameTemplate and withoutSharding.
+
+ @Test
+ public void testReadDisplayData() {
+ AvroIO.Read.Bound<?> read = AvroIO.Read.from("foo.*")
+ .withoutValidation();
+
+ DisplayData displayData = DisplayData.from(read);
+ assertThat(displayData, hasDisplayItem("filePattern", "foo.*"));
+ assertThat(displayData, hasDisplayItem("validation", false));
+ }
+
+ @Test
+ public void testWriteDisplayData() {
+ AvroIO.Write.Bound<?> write = AvroIO.Write
+ .to("foo")
+ .withShardNameTemplate("-SS-of-NN-")
+ .withSuffix("bar")
+ .withSchema(GenericClass.class)
+ .withNumShards(100)
+ .withoutValidation();
+
+ DisplayData displayData = DisplayData.from(write);
+
+ assertThat(displayData, hasDisplayItem("filePrefix", "foo"));
+ assertThat(displayData, hasDisplayItem("shardNameTemplate", "-SS-of-NN-"));
+ assertThat(displayData, hasDisplayItem("fileSuffix", "bar"));
+ assertThat(displayData, hasDisplayItem("schema", GenericClass.class));
+ assertThat(displayData, hasDisplayItem("numShards", 100));
+ assertThat(displayData, hasDisplayItem("validation", false));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15b11ede/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
index e1f8e4d..63ff22c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
@@ -17,8 +17,12 @@
*/
package org.apache.beam.sdk.io;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CoderException;
@@ -32,6 +36,7 @@ import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.BigQueryServices;
import org.apache.beam.sdk.util.BigQueryServices.Status;
@@ -391,6 +396,24 @@ public class BigQueryIOTest {
}
@Test
+ public void testBuildSourceDisplayData() {
+ String tableSpec = "project:dataset.tableid";
+
+ BigQueryIO.Read.Bound read = BigQueryIO.Read
+ .from(tableSpec)
+ .fromQuery("myQuery")
+ .withoutResultFlattening()
+ .withoutValidation();
+
+ DisplayData displayData = DisplayData.from(read);
+
+ assertThat(displayData, hasDisplayItem("table", tableSpec));
+ assertThat(displayData, hasDisplayItem("query", "myQuery"));
+ assertThat(displayData, hasDisplayItem("flattenResults", false));
+ assertThat(displayData, hasDisplayItem("validation", false));
+ }
+
+ @Test
public void testBuildSink() {
BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable")
.to("foo.com:project:somedataset.sometable");
@@ -502,6 +525,29 @@ public class BigQueryIOTest {
null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY);
}
+ @Test
+ public void testBuildSinkDisplayData() {
+ String tableSpec = "project:dataset.table";
+ TableSchema schema = new TableSchema().set("col1", "type1").set("col2", "type2");
+
+ BigQueryIO.Write.Bound write = BigQueryIO.Write
+ .to(tableSpec)
+ .withSchema(schema)
+ .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+ .withWriteDisposition(WriteDisposition.WRITE_APPEND)
+ .withoutValidation();
+
+ DisplayData displayData = DisplayData.from(write);
+
+ assertThat(displayData, hasDisplayItem(hasKey("table")));
+ assertThat(displayData, hasDisplayItem(hasKey("schema")));
+ assertThat(displayData,
+ hasDisplayItem("createDisposition", CreateDisposition.CREATE_IF_NEEDED.toString()));
+ assertThat(displayData,
+ hasDisplayItem("writeDisposition", WriteDisposition.WRITE_APPEND.toString()));
+ assertThat(displayData, hasDisplayItem("validation", false));
+ }
+
private void testWriteValidatesDataset(boolean streaming) {
BigQueryOptions options = PipelineOptionsFactory.as(BigQueryOptions.class);
options.setProject("someproject");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15b11ede/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
index b3b9b08..c08b6f8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.beam.sdk.io;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
@@ -30,9 +31,11 @@ import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
+import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.RemoveDuplicates;
import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
@@ -76,6 +79,13 @@ public class CountingInputTest {
}
@Test
+ public void testBoundedDisplayData() {
+ PTransform<?, ?> input = CountingInput.upTo(1234);
+ DisplayData displayData = DisplayData.from(input);
+ assertThat(displayData, hasDisplayItem("upTo", 1234));
+ }
+
+ @Test
@Category(RunnableOnService.class)
public void testUnboundedInput() {
Pipeline p = TestPipeline.create();
@@ -138,6 +148,28 @@ public class CountingInputTest {
p.run();
}
+ @Test
+ public void testUnboundedDisplayData() {
+ Duration maxReadTime = Duration.standardHours(5);
+ SerializableFunction<Long, Instant> timestampFn = new SerializableFunction<Long, Instant>() {
+ @Override
+ public Instant apply(Long input) {
+ return Instant.now();
+ }
+ };
+
+ PTransform<?, ?> input = CountingInput.unbounded()
+ .withMaxNumRecords(1234)
+ .withMaxReadTime(maxReadTime)
+ .withTimestampFn(timestampFn);
+
+ DisplayData displayData = DisplayData.from(input);
+
+ assertThat(displayData, hasDisplayItem("maxRecords", 1234));
+ assertThat(displayData, hasDisplayItem("maxReadTime", maxReadTime));
+ assertThat(displayData, hasDisplayItem("timestampFn", timestampFn.getClass()));
+ }
+
/**
* A timestamp function that uses the given value as the timestamp. Because the input values will
* not wrap, this function is non-decreasing and meets the timestamp function criteria laid out
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15b11ede/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
index 852ab7d..1e5bf51 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
@@ -17,12 +17,18 @@
*/
package org.apache.beam.sdk.io;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.transforms.display.DisplayData;
import com.google.api.client.testing.http.FixedClock;
import com.google.api.client.util.Clock;
import com.google.api.services.pubsub.model.PubsubMessage;
+import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
@@ -231,4 +237,42 @@ public class PubsubIOTest {
// Year 10000 out of range.
parseTimestamp("10000-10-29T23:41:41.123999Z");
}
+
+ @Test
+ public void testReadDisplayData() {
+ String topic = "projects/project/topics/topic";
+ String subscription = "projects/project/subscriptions/subscription";
+ Duration maxReadTime = Duration.standardMinutes(5);
+ PubsubIO.Read.Bound<String> read = PubsubIO.Read
+ .topic(topic)
+ .subscription(subscription)
+ .timestampLabel("myTimestamp")
+ .idLabel("myId")
+ .maxNumRecords(1234)
+ .maxReadTime(maxReadTime);
+
+ DisplayData displayData = DisplayData.from(read);
+
+ assertThat(displayData, hasDisplayItem("topic", topic));
+ assertThat(displayData, hasDisplayItem("subscription", subscription));
+ assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp"));
+ assertThat(displayData, hasDisplayItem("idLabel", "myId"));
+ assertThat(displayData, hasDisplayItem("maxNumRecords", 1234));
+ assertThat(displayData, hasDisplayItem("maxReadTime", maxReadTime));
+ }
+
+ @Test
+ public void testWriteDisplayData() {
+ String topic = "projects/project/topics/topic";
+ PubsubIO.Write.Bound<?> write = PubsubIO.Write
+ .topic(topic)
+ .timestampLabel("myTimestamp")
+ .idLabel("myId");
+
+ DisplayData displayData = DisplayData.from(write);
+
+ assertThat(displayData, hasDisplayItem("topic", topic));
+ assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp"));
+ assertThat(displayData, hasDisplayItem("idLabel", "myId"));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15b11ede/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 09b7d22..4d6d8dd 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -21,6 +21,7 @@ import static org.apache.beam.sdk.TestUtils.INTS_ARRAY;
import static org.apache.beam.sdk.TestUtils.LINES_ARRAY;
import static org.apache.beam.sdk.TestUtils.NO_INTS_ARRAY;
import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
@@ -41,6 +42,7 @@ import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.values.PCollection;
@@ -155,6 +157,20 @@ public class TextIOTest {
}
}
+ @Test
+ public void testReadDisplayData() {
+ TextIO.Read.Bound<?> read = TextIO.Read
+ .from("foo.*")
+ .withCompressionType(CompressionType.BZIP2)
+ .withoutValidation();
+
+ DisplayData displayData = DisplayData.from(read);
+
+ assertThat(displayData, hasDisplayItem("filePattern", "foo.*"));
+ assertThat(displayData, hasDisplayItem("compressionType", CompressionType.BZIP2.toString()));
+ assertThat(displayData, hasDisplayItem("validation", false));
+ }
+
<T> void runTestWrite(T[] elems, Coder<T> coder) throws Exception {
runTestWrite(elems, coder, 1);
}
@@ -275,6 +291,24 @@ public class TextIOTest {
}
@Test
+ public void testWriteDisplayData() {
+ TextIO.Write.Bound<?> write = TextIO.Write
+ .to("foo")
+ .withSuffix("bar")
+ .withShardNameTemplate("-SS-of-NN-")
+ .withNumShards(100)
+ .withoutValidation();
+
+ DisplayData displayData = DisplayData.from(write);
+
+ assertThat(displayData, hasDisplayItem("filePrefix", "foo"));
+ assertThat(displayData, hasDisplayItem("fileSuffix", "bar"));
+ assertThat(displayData, hasDisplayItem("shardNameTemplate", "-SS-of-NN-"));
+ assertThat(displayData, hasDisplayItem("numShards", 100));
+ assertThat(displayData, hasDisplayItem("validation", false));
+ }
+
+ @Test
public void testUnsupportedFilePattern() throws IOException {
File outFolder = tmpFolder.newFolder();
// Windows doesn't like resolving paths with * in them.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15b11ede/sdks/java/core/src/test/java/org/apache/beam/sdk/io/bigtable/BigtableIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/bigtable/BigtableIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/bigtable/BigtableIOTest.java
index 135fcb8..7c176e4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/bigtable/BigtableIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/bigtable/BigtableIOTest.java
@@ -20,7 +20,10 @@ package org.apache.beam.sdk.io.bigtable;
import static org.apache.beam.sdk.testing.SourceTestUtils.assertSourcesEqualReferenceSource;
import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionExhaustive;
import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionFails;
-import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent;
+import static org.apache.beam.sdk.testing.SourceTestUtils
+ .assertSplitAtFractionSucceedsAndConsistent;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verifyNotNull;
@@ -39,6 +42,7 @@ import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -407,6 +411,26 @@ public class BigtableIOTest {
assertSourcesEqualReferenceSource(source, splits, null /* options */);
}
+ @Test
+ public void testReadingDisplayData() {
+ RowFilter rowFilter = RowFilter.newBuilder()
+ .setRowKeyRegexFilter(ByteString.copyFromUtf8("foo.*"))
+ .build();
+
+ BigtableIO.Read read = BigtableIO.read()
+ .withBigtableOptions(BIGTABLE_OPTIONS)
+ .withTableId("fooTable")
+ .withRowFilter(rowFilter);
+
+ DisplayData displayData = DisplayData.from(read);
+
+ assertThat(displayData, hasDisplayItem("tableId", "fooTable"));
+ assertThat(displayData, hasDisplayItem("rowFilter", rowFilter.toString()));
+
+ // BigtableIO adds user-agent to options; assert only on key and not value.
+ assertThat(displayData, hasDisplayItem(hasKey("bigtableOptions")));
+ }
+
/** Tests that a record gets written to the service and messages are logged. */
@Test
public void testWriting() throws Exception {
@@ -463,6 +487,19 @@ public class BigtableIOTest {
p.run();
}
+ @Test
+ public void testWritingDisplayData() {
+ BigtableIO.Write write = BigtableIO.write()
+ .withTableId("fooTable")
+ .withBigtableOptions(BIGTABLE_OPTIONS);
+
+ DisplayData displayData = DisplayData.from(write);
+
+ assertThat(displayData, hasDisplayItem("tableId", "fooTable"));
+ // BigtableIO adds user-agent to options; assert only on key and not value.
+ assertThat(displayData, hasDisplayItem(hasKey("bigtableOptions")));
+ }
+
////////////////////////////////////////////////////////////////////////////////////////////
private static final String COLUMN_FAMILY_NAME = "family";
private static final ByteString COLUMN_NAME = ByteString.copyFromUtf8("column");