You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/10/14 21:02:44 UTC
[1/2] incubator-beam git commit: Perform initial splitting in the
DirectRunner
Repository: incubator-beam
Updated Branches:
refs/heads/master 86d222aab -> 0a413e78e
Perform initial splitting in the DirectRunner
This allows sources to be read from in parallel and generates initial
splits.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f68fea02
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f68fea02
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f68fea02
Branch: refs/heads/master
Commit: f68fea02b63e5844b9ccbd31ff8e02da407f65b7
Parents: 86d222a
Author: Thomas Groh <tg...@google.com>
Authored: Wed Oct 5 16:11:21 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Oct 14 13:54:55 2016 -0700
----------------------------------------------------------------------
.../direct/BoundedReadEvaluatorFactory.java | 40 ++-
.../beam/runners/direct/DirectOptions.java | 23 ++
.../beam/runners/direct/DirectRunner.java | 11 +-
.../beam/runners/direct/EmptyInputProvider.java | 12 +-
.../direct/ExecutorServiceParallelExecutor.java | 15 +-
.../beam/runners/direct/RootInputProvider.java | 7 +-
.../runners/direct/RootProviderRegistry.java | 5 +-
.../direct/TestStreamEvaluatorFactory.java | 4 +-
.../direct/TransformEvaluatorRegistry.java | 10 +-
.../direct/UnboundedReadEvaluatorFactory.java | 35 ++-
.../beam/runners/direct/WatermarkManager.java | 1 +
.../direct/BoundedReadEvaluatorFactoryTest.java | 41 ++-
.../direct/FlattenEvaluatorFactoryTest.java | 9 +-
.../direct/TestStreamEvaluatorFactoryTest.java | 2 +-
.../UnboundedReadEvaluatorFactoryTest.java | 55 +++-
.../sdk/io/gcp/bigquery/BigQueryAvroUtils.java | 69 ++++-
.../io/gcp/bigquery/BigQueryAvroUtilsTest.java | 132 +++++++--
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 292 ++++++++++++++++++-
.../sdk/io/gcp/bigtable/BigtableIOTest.java | 9 +-
19 files changed, 662 insertions(+), 110 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f68fea02/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index 326a535..843dcd6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -18,28 +18,32 @@
package org.apache.beam.runners.direct;
import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.util.Collection;
-import java.util.Collections;
+import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.runners.direct.StepTransformResult.Builder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
-import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.Read.Bounded;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator TransformEvaluators}
* for the {@link Bounded Read.Bounded} primitive {@link PTransform}.
*/
final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(BoundedReadEvaluatorFactory.class);
private final EvaluationContext evaluationContext;
BoundedReadEvaluatorFactory(EvaluationContext evaluationContext) {
@@ -126,18 +130,32 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
}
@Override
- public Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
- return createInitialSplits((AppliedPTransform) transform);
+ public Collection<CommittedBundle<?>> getInitialInputs(
+ AppliedPTransform<?, ?, ?> transform, int targetParallelism) throws Exception {
+ return createInitialSplits((AppliedPTransform) transform, targetParallelism);
}
- private <OutputT> Collection<CommittedBundle<?>> createInitialSplits(
- AppliedPTransform<?, ?, Read.Bounded<OutputT>> transform) {
+ private <OutputT>
+ Collection<CommittedBundle<BoundedSourceShard<OutputT>>> createInitialSplits(
+ AppliedPTransform<?, ?, Bounded<OutputT>> transform, int targetParallelism)
+ throws Exception {
BoundedSource<OutputT> source = transform.getTransform().getSource();
- return Collections.<CommittedBundle<?>>singleton(
- evaluationContext
- .<BoundedSourceShard<OutputT>>createRootBundle()
- .add(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of(source)))
- .commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
+ PipelineOptions options = evaluationContext.getPipelineOptions();
+ long estimatedBytes = source.getEstimatedSizeBytes(options);
+ long bytesPerBundle = estimatedBytes / targetParallelism;
+ List<? extends BoundedSource<OutputT>> bundles =
+ source.splitIntoBundles(bytesPerBundle, options);
+ ImmutableList.Builder<CommittedBundle<BoundedSourceShard<OutputT>>> shards =
+ ImmutableList.builder();
+ for (BoundedSource<OutputT> bundle : bundles) {
+ CommittedBundle<BoundedSourceShard<OutputT>> inputShard =
+ evaluationContext
+ .<BoundedSourceShard<OutputT>>createRootBundle()
+ .add(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of(bundle)))
+ .commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
+ shards.add(inputShard);
+ }
+ return shards.build();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f68fea02/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
index 89e1bb8..b2c4f47 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.direct;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -62,4 +63,26 @@ public interface DirectOptions extends PipelineOptions, ApplicationNameOptions {
+ "PCollection are encodable. All elements in a PCollection must be encodable.")
boolean isEnforceEncodability();
void setEnforceEncodability(boolean test);
+
+ @Default.InstanceFactory(AvailableParallelismFactory.class)
+ @Description(
+ "Controls the amount of target parallelism the DirectRunner will use. Defaults to"
+ + " the greater of the number of available processors and 3. Must be a value greater"
+ + " than zero.")
+ int getTargetParallelism();
+ void setTargetParallelism(int target);
+
+ /**
+ * A {@link DefaultValueFactory} that returns the result of {@link Runtime#availableProcessors()}
+ * from the {@link #create(PipelineOptions)} method. Uses {@link Runtime#getRuntime()} to obtain
+ * the {@link Runtime}.
+ */
+ class AvailableParallelismFactory implements DefaultValueFactory<Integer> {
+ private static final int MIN_PARALLELISM = 3;
+
+ @Override
+ public Integer create(PipelineOptions options) {
+ return Math.max(Runtime.getRuntime().availableProcessors(), MIN_PARALLELISM);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f68fea02/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 8941093..6ef2472 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -186,7 +186,7 @@ public class DirectRunner
////////////////////////////////////////////////////////////////////////////////////////////////
private final DirectOptions options;
- private Supplier<ExecutorService> executorServiceSupplier = new FixedThreadPoolSupplier();
+ private Supplier<ExecutorService> executorServiceSupplier;
private Supplier<Clock> clockSupplier = new NanosOffsetClockSupplier();
public static DirectRunner fromOptions(PipelineOptions options) {
@@ -195,6 +195,7 @@ public class DirectRunner
private DirectRunner(DirectOptions options) {
this.options = options;
+ this.executorServiceSupplier = new FixedThreadPoolSupplier(options);
}
/**
@@ -440,9 +441,15 @@ public class DirectRunner
* {@link Executors#newFixedThreadPool(int)}.
*/
private static class FixedThreadPoolSupplier implements Supplier<ExecutorService> {
+ private final DirectOptions options;
+
+ private FixedThreadPoolSupplier(DirectOptions options) {
+ this.options = options;
+ }
+
@Override
public ExecutorService get() {
- return Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+ return Executors.newFixedThreadPool(options.getTargetParallelism());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f68fea02/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
index 10d63e9..1058943 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
@@ -21,8 +21,6 @@ import java.util.Collection;
import java.util.Collections;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
/**
* A {@link RootInputProvider} that provides a singleton empty bundle.
@@ -37,13 +35,11 @@ class EmptyInputProvider implements RootInputProvider {
/**
* {@inheritDoc}.
*
- * <p>Returns a single empty bundle. This bundle ensures that any {@link PTransform PTransforms}
- * that consume from the output of the provided {@link AppliedPTransform} have watermarks updated
- * as appropriate.
+ * <p>Returns an empty collection.
*/
@Override
- public Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
- return Collections.<CommittedBundle<?>>singleton(
- evaluationContext.createRootBundle().commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
+ public Collection<CommittedBundle<?>> getInitialInputs(
+ AppliedPTransform<?, ?, ?> transform, int targetParallelism) {
+ return Collections.emptyList();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f68fea02/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 3761574..3274524 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -17,8 +17,6 @@
*/
package org.apache.beam.runners.direct;
-import static com.google.common.base.Preconditions.checkState;
-
import com.google.auto.value.AutoValue;
import com.google.common.base.MoreObjects;
import com.google.common.base.Optional;
@@ -51,6 +49,7 @@ import org.apache.beam.sdk.util.KeyedWorkItem;
import org.apache.beam.sdk.util.KeyedWorkItems;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
@@ -166,12 +165,12 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
public void start(Collection<AppliedPTransform<?, ?, ?>> roots) {
for (AppliedPTransform<?, ?, ?> root : roots) {
ConcurrentLinkedQueue<CommittedBundle<?>> pending = new ConcurrentLinkedQueue<>();
- Collection<CommittedBundle<?>> initialInputs = rootInputProvider.getInitialInputs(root);
- checkState(
- !initialInputs.isEmpty(),
- "All root transforms must have initial inputs. Got 0 for %s",
- root.getFullName());
- pending.addAll(initialInputs);
+ try {
+ Collection<CommittedBundle<?>> initialInputs = rootInputProvider.getInitialInputs(root, 1);
+ pending.addAll(initialInputs);
+ } catch (Exception e) {
+ throw UserCodeException.wrap(e);
+ }
pendingRootBundles.put(root, pending);
}
evaluationContext.initialize(pendingRootBundles);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f68fea02/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java
index 40c7301..19d0040 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java
@@ -36,6 +36,11 @@ interface RootInputProvider {
* <p>For source transforms, these should be sufficient that, when provided to the evaluators
* produced by {@link TransformEvaluatorFactory#forApplication(AppliedPTransform,
* CommittedBundle)}, all of the elements contained in the source are eventually produced.
+ *
+ * @param transform the {@link AppliedPTransform} to get initial inputs for.
+ * @param targetParallelism the target amount of parallelism to obtain from the source. Must be
+ * greater than or equal to 1.
*/
- Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> transform);
+ Collection<CommittedBundle<?>> getInitialInputs(
+ AppliedPTransform<?, ?, ?> transform, int targetParallelism) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f68fea02/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
index f6335fd..bb5fcd2 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
@@ -52,7 +52,8 @@ class RootProviderRegistry implements RootInputProvider {
}
@Override
- public Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
+ public Collection<CommittedBundle<?>> getInitialInputs(
+ AppliedPTransform<?, ?, ?> transform, int targetParallelism) throws Exception {
Class<? extends PTransform> transformClass = transform.getTransform().getClass();
RootInputProvider provider =
checkNotNull(
@@ -60,6 +61,6 @@ class RootProviderRegistry implements RootInputProvider {
"Tried to get a %s for a Transform of type %s, but there is no such provider",
RootInputProvider.class.getSimpleName(),
transformClass.getSimpleName());
- return provider.getInitialInputs(transform);
+ return provider.getInitialInputs(transform, targetParallelism);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f68fea02/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index 4a48a58..065adc1 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -199,7 +199,8 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
}
@Override
- public Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
+ public Collection<CommittedBundle<?>> getInitialInputs(
+ AppliedPTransform<?, ?, ?> transform, int targetParallelism) {
return createInputBundle((AppliedPTransform) transform);
}
@@ -213,6 +214,7 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
return Collections.<CommittedBundle<?>>singleton(initialBundle);
}
}
+
@AutoValue
abstract static class TestStreamIndex<T> {
static <T> TestStreamIndex<T> of(TestStream<T> stream) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f68fea02/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index 4b495e6..3dd44a7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.direct;
+import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.collect.ImmutableMap;
@@ -81,14 +82,13 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
throws Exception {
checkState(
!finished.get(), "Tried to get an evaluator for a finished TransformEvaluatorRegistry");
- TransformEvaluatorFactory factory = getFactory(application);
+ Class<? extends PTransform> transformClass = application.getTransform().getClass();
+ TransformEvaluatorFactory factory =
+ checkNotNull(
+ factories.get(transformClass), "No evaluator for PTransform type %s", transformClass);
return factory.forApplication(application, inputBundle);
}
- private TransformEvaluatorFactory getFactory(AppliedPTransform<?, ?, ?> application) {
- return factories.get(application.getTransform().getClass());
- }
-
@Override
public void cleanup() throws Exception {
Collection<Exception> thrownInCleanup = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f68fea02/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index 08dc286..18d3d0a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -19,9 +19,11 @@ package org.apache.beam.runners.direct;
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
@@ -33,20 +35,22 @@ import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator TransformEvaluators}
* for the {@link Unbounded Read.Unbounded} primitive {@link PTransform}.
*/
class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(UnboundedReadEvaluatorFactory.class);
// Occasionally close an existing reader and resume from checkpoint, to exercise close-and-resume
- @VisibleForTesting static final double DEFAULT_READER_REUSE_CHANCE = 0.95;
+ private static final double DEFAULT_READER_REUSE_CHANCE = 0.95;
private final EvaluationContext evaluationContext;
private final double readerReuseChance;
@@ -253,24 +257,33 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
}
@Override
- public Collection<CommittedBundle<?>> getInitialInputs(AppliedPTransform<?, ?, ?> transform) {
- return createInitialSplits((AppliedPTransform) transform);
+ public Collection<CommittedBundle<?>> getInitialInputs(
+ AppliedPTransform<?, ?, ?> transform, int targetParallelism) throws Exception {
+ return createInitialSplits((AppliedPTransform) transform, targetParallelism);
}
private <OutputT> Collection<CommittedBundle<?>> createInitialSplits(
- AppliedPTransform<?, ?, Read.Unbounded<OutputT>> transform) {
+ AppliedPTransform<?, ?, Unbounded<OutputT>> transform, int targetParallelism)
+ throws Exception {
UnboundedSource<OutputT, ?> source = transform.getTransform().getSource();
+ List<? extends UnboundedSource<OutputT, ?>> splits =
+ source.generateInitialSplits(targetParallelism, evaluationContext.getPipelineOptions());
UnboundedReadDeduplicator deduplicator =
source.requiresDeduping()
? UnboundedReadDeduplicator.CachedIdDeduplicator.create()
: NeverDeduplicator.create();
- UnboundedSourceShard<OutputT, ?> shard = UnboundedSourceShard.unstarted(source, deduplicator);
- return Collections.<CommittedBundle<?>>singleton(
- evaluationContext
- .<UnboundedSourceShard<?, ?>>createRootBundle()
- .add(WindowedValue.<UnboundedSourceShard<?, ?>>valueInGlobalWindow(shard))
- .commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
+ ImmutableList.Builder<CommittedBundle<?>> initialShards = ImmutableList.builder();
+ for (UnboundedSource<OutputT, ?> split : splits) {
+ UnboundedSourceShard<OutputT, ?> shard =
+ UnboundedSourceShard.unstarted(split, deduplicator);
+ initialShards.add(
+ evaluationContext
+ .<UnboundedSourceShard<?, ?>>createRootBundle()
+ .add(WindowedValue.<UnboundedSourceShard<?, ?>>valueInGlobalWindow(shard))
+ .commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
+ }
+ return initialShards.build();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f68fea02/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index 82a6e4f..c55a036 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -799,6 +799,7 @@ public class WatermarkManager {
for (CommittedBundle<?> initialBundle : rootEntry.getValue()) {
rootWms.addPending(initialBundle);
}
+ pendingRefreshes.offer(rootEntry.getKey());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f68fea02/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
index ee17eae..8a76a53 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
@@ -17,10 +17,14 @@
*/
package org.apache.beam.runners.direct;
+import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.when;
@@ -43,9 +47,11 @@ import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.hamcrest.Matchers;
@@ -56,6 +62,8 @@ import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
/**
* Tests for {@link BoundedReadEvaluatorFactory}.
@@ -87,7 +95,7 @@ public class BoundedReadEvaluatorFactoryTest {
Collection<CommittedBundle<?>> initialInputs =
new BoundedReadEvaluatorFactory.InputProvider(context)
- .getInitialInputs(longs.getProducingTransformInternal());
+ .getInitialInputs(longs.getProducingTransformInternal(), 1);
List<WindowedValue<?>> outputs = new ArrayList<>();
for (CommittedBundle<?> shardBundle : initialInputs) {
TransformEvaluator<?> evaluator =
@@ -115,6 +123,37 @@ public class BoundedReadEvaluatorFactoryTest {
}
@Test
+ public void getInitialInputsSplitsIntoBundles() throws Exception {
+ when(context.createRootBundle())
+ .thenAnswer(
+ new Answer<UncommittedBundle<?>>() {
+ @Override
+ public UncommittedBundle<?> answer(InvocationOnMock invocation) throws Throwable {
+ return bundleFactory.createRootBundle();
+ }
+ });
+ Collection<CommittedBundle<?>> initialInputs =
+ new BoundedReadEvaluatorFactory.InputProvider(context)
+ .getInitialInputs(longs.getProducingTransformInternal(), 3);
+
+ assertThat(initialInputs, hasSize(allOf(greaterThanOrEqualTo(3), lessThanOrEqualTo(4))));
+
+ Collection<BoundedSource<Long>> sources = new ArrayList<>();
+ for (CommittedBundle<?> initialInput : initialInputs) {
+ Iterable<WindowedValue<BoundedSourceShard<Long>>> shards =
+ (Iterable) initialInput.getElements();
+ WindowedValue<BoundedSourceShard<Long>> shard = Iterables.getOnlyElement(shards);
+ assertThat(shard.getWindows(), Matchers.<BoundedWindow>contains(GlobalWindow.INSTANCE));
+ assertThat(shard.getTimestamp(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
+ sources.add(shard.getValue().getSource());
+ }
+
+ SourceTestUtils.assertSourcesEqualReferenceSource(source,
+ (List<? extends BoundedSource<Long>>) sources,
+ PipelineOptionsFactory.create());
+ }
+
+ @Test
public void boundedSourceInMemoryTransformEvaluatorShardsOfSource() throws Exception {
PipelineOptions options = PipelineOptionsFactory.create();
List<? extends BoundedSource<Long>> splits =
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f68fea02/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
index aa7b178..417aa64 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
@@ -24,13 +24,13 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.google.common.collect.Iterables;
-import java.util.Collection;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
@@ -122,17 +122,14 @@ public class FlattenEvaluatorFactoryTest {
PCollection<Integer> flattened = list.apply(Flatten.<Integer>pCollections());
EvaluationContext evaluationContext = mock(EvaluationContext.class);
- when(evaluationContext.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
when(evaluationContext.createBundle(flattened))
.thenReturn(bundleFactory.createBundle(flattened));
FlattenEvaluatorFactory factory = new FlattenEvaluatorFactory(evaluationContext);
- Collection<CommittedBundle<?>> initialInputs =
- new EmptyInputProvider(evaluationContext)
- .getInitialInputs(flattened.getProducingTransformInternal());
TransformEvaluator<Integer> emptyEvaluator =
factory.forApplication(
- flattened.getProducingTransformInternal(), Iterables.getOnlyElement(initialInputs));
+ flattened.getProducingTransformInternal(),
+ bundleFactory.createRootBundle().commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
TransformResult leftSideResult = emptyEvaluator.finishBundle();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f68fea02/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
index 60b9c79..94a0d41 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
@@ -82,7 +82,7 @@ public class TestStreamEvaluatorFactoryTest {
Collection<CommittedBundle<?>> initialInputs =
new TestStreamEvaluatorFactory.InputProvider(context)
- .getInitialInputs(streamVals.getProducingTransformInternal());
+ .getInitialInputs(streamVals.getProducingTransformInternal(), 1);
@SuppressWarnings("unchecked")
CommittedBundle<TestStreamIndex<Integer>> initialBundle =
(CommittedBundle<TestStreamIndex<Integer>>) Iterables.getOnlyElement(initialInputs);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f68fea02/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index b78fbeb..76acb03 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.direct;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;
@@ -33,10 +34,12 @@ import com.google.common.collect.Range;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.NoSuchElementException;
+import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
@@ -51,9 +54,12 @@ import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
@@ -66,6 +72,9 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
/**
* Tests for {@link UnboundedReadEvaluatorFactory}.
*/
@@ -93,12 +102,51 @@ public class UnboundedReadEvaluatorFactoryTest {
}
@Test
+ public void generatesInitialSplits() throws Exception {
+ when(context.createRootBundle()).thenAnswer(new Answer<UncommittedBundle<?>>() {
+ @Override
+ public UncommittedBundle<?> answer(InvocationOnMock invocation) throws Throwable {
+ return bundleFactory.createRootBundle();
+ }
+ });
+
+ int numSplits = 5;
+ Collection<CommittedBundle<?>> initialInputs =
+ new UnboundedReadEvaluatorFactory.InputProvider(context)
+ .getInitialInputs(longs.getProducingTransformInternal(), numSplits);
+ // CountingSource.unbounded has very good splitting behavior
+ assertThat(initialInputs, hasSize(numSplits));
+
+ int readPerSplit = 100;
+ int totalSize = numSplits * readPerSplit;
+ Set<Long> expectedOutputs =
+ ContiguousSet.create(Range.closedOpen(0L, (long) totalSize), DiscreteDomain.longs());
+
+ Collection<Long> readItems = new ArrayList<>(totalSize);
+ for (CommittedBundle<?> initialInput : initialInputs) {
+ CommittedBundle<UnboundedSourceShard<Long, ?>> shardBundle =
+ (CommittedBundle<UnboundedSourceShard<Long, ?>>) initialInput;
+ WindowedValue<UnboundedSourceShard<Long, ?>> shard =
+ Iterables.getOnlyElement(shardBundle.getElements());
+ assertThat(shard.getTimestamp(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
+ assertThat(shard.getWindows(), Matchers.<BoundedWindow>contains(GlobalWindow.INSTANCE));
+ UnboundedSource<Long, ?> shardSource = shard.getValue().getSource();
+ readItems.addAll(
+ SourceTestUtils.readNItemsFromUnstartedReader(
+ shardSource.createReader(
+ PipelineOptionsFactory.create(), null /* No starting checkpoint */),
+ readPerSplit));
+ }
+ assertThat(readItems, containsInAnyOrder(expectedOutputs.toArray(new Long[0])));
+ }
+
+ @Test
public void unboundedSourceInMemoryTransformEvaluatorProducesElements() throws Exception {
when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
Collection<CommittedBundle<?>> initialInputs =
new UnboundedReadEvaluatorFactory.InputProvider(context)
- .getInitialInputs(longs.getProducingTransformInternal());
+ .getInitialInputs(longs.getProducingTransformInternal(), 1);
CommittedBundle<?> inputShards = Iterables.getOnlyElement(initialInputs);
UnboundedSourceShard<Long, ?> inputShard =
@@ -143,7 +191,8 @@ public class UnboundedReadEvaluatorFactoryTest {
when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
Collection<CommittedBundle<?>> initialInputs =
- new UnboundedReadEvaluatorFactory.InputProvider(context).getInitialInputs(sourceTransform);
+ new UnboundedReadEvaluatorFactory.InputProvider(context)
+ .getInitialInputs(sourceTransform, 1);
UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection);
when(context.createBundle(pcollection)).thenReturn(output);
@@ -198,6 +247,8 @@ public class UnboundedReadEvaluatorFactoryTest {
.commit(Instant.now());
UnboundedReadEvaluatorFactory factory =
new UnboundedReadEvaluatorFactory(context, 1.0 /* Always reuse */);
+ new UnboundedReadEvaluatorFactory.InputProvider(context)
+ .getInitialInputs(pcollection.getProducingTransformInternal(), 1);
TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> evaluator =
factory.forApplication(sourceTransform, inputBundle);
evaluator.processElement(shard);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f68fea02/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
index 6a9ea6b..20dd2d0 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
@@ -28,8 +28,8 @@ import com.google.api.services.bigquery.model.TableSchema;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.BaseEncoding;
-
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.avro.Schema;
@@ -47,6 +47,19 @@ import org.joda.time.format.DateTimeFormatter;
*/
class BigQueryAvroUtils {
+ public static final ImmutableMap<String, Type> BIG_QUERY_TO_AVRO_TYPES =
+ ImmutableMap.<String, Type>builder()
+ .put("STRING", Type.STRING)
+ .put("BYTES", Type.BYTES)
+ .put("INTEGER", Type.LONG)
+ .put("FLOAT", Type.DOUBLE)
+ .put("BOOLEAN", Type.BOOLEAN)
+ .put("TIMESTAMP", Type.LONG)
+ .put("RECORD", Type.RECORD)
+ .put("DATE", Type.STRING)
+ .put("DATETIME", Type.STRING)
+ .put("TIME", Type.STRING)
+ .build();
/**
* Formats BigQuery seconds-since-epoch into String matching JSON export. Thread-safe and
* immutable.
@@ -154,23 +167,10 @@ class BigQueryAvroUtils {
// REQUIRED fields are represented as the corresponding Avro types. For example, a BigQuery
// INTEGER type maps to an Avro LONG type.
checkNotNull(v, "REQUIRED field %s should not be null", fieldSchema.getName());
- ImmutableMap<String, Type> fieldMap =
- ImmutableMap.<String, Type>builder()
- .put("STRING", Type.STRING)
- .put("BYTES", Type.BYTES)
- .put("INTEGER", Type.LONG)
- .put("FLOAT", Type.DOUBLE)
- .put("BOOLEAN", Type.BOOLEAN)
- .put("TIMESTAMP", Type.LONG)
- .put("RECORD", Type.RECORD)
- .put("DATE", Type.STRING)
- .put("DATETIME", Type.STRING)
- .put("TIME", Type.STRING)
- .build();
// Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the type field
// is required, so it may not be null.
String bqType = fieldSchema.getType();
- Type expectedAvroType = fieldMap.get(bqType);
+ Type expectedAvroType = BIG_QUERY_TO_AVRO_TYPES.get(bqType);
verifyNotNull(expectedAvroType, "Unsupported BigQuery type: %s", bqType);
verify(
avroType == expectedAvroType,
@@ -248,4 +248,43 @@ class BigQueryAvroUtils {
}
return convertRequiredField(unionTypes.get(1).getType(), fieldSchema, v);
}
+
+ static Schema toGenericAvroSchema(String schemaName, List<TableFieldSchema> fieldSchemas) {
+ List<Field> avroFields = new ArrayList<>();
+ for (TableFieldSchema bigQueryField : fieldSchemas) {
+ avroFields.add(convertField(bigQueryField));
+ }
+ return Schema.createRecord(
+ schemaName,
+ "org.apache.beam.sdk.io.gcp.bigquery",
+ "Translated Avro Schema for " + schemaName,
+ false,
+ avroFields);
+ }
+
+ private static Field convertField(TableFieldSchema bigQueryField) {
+ Type avroType = BIG_QUERY_TO_AVRO_TYPES.get(bigQueryField.getType());
+ Schema elementSchema;
+ if (avroType == Type.RECORD) {
+ elementSchema = toGenericAvroSchema(bigQueryField.getName(), bigQueryField.getFields());
+ } else {
+ elementSchema = Schema.create(avroType);
+ }
+ Schema fieldSchema;
+ if (bigQueryField.getMode() == null || bigQueryField.getMode().equals("NULLABLE")) {
+ fieldSchema = Schema.createUnion(Schema.create(Type.NULL), elementSchema);
+ } else if (bigQueryField.getMode().equals("REQUIRED")) {
+ fieldSchema = elementSchema;
+ } else if (bigQueryField.getMode().equals("REPEATED")) {
+ fieldSchema = Schema.createArray(elementSchema);
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Unknown BigQuery Field Mode: %s", bigQueryField.getMode()));
+ }
+ return new Field(
+ bigQueryField.getName(),
+ fieldSchema,
+ bigQueryField.getDescription(),
+ (Object) null /* Cast to avoid deprecated JsonNode constructor. */);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f68fea02/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
index 1d3ea81..644c545 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
@@ -17,18 +17,22 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;
+import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.io.BaseEncoding;
-
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.reflect.Nullable;
@@ -44,36 +48,37 @@ import org.junit.runners.JUnit4;
*/
@RunWith(JUnit4.class)
public class BigQueryAvroUtilsTest {
+ private List<TableFieldSchema> subFields = Lists.<TableFieldSchema>newArrayList(
+ new TableFieldSchema().setName("species").setType("STRING").setMode("NULLABLE"));
+ /*
+ * Note that the quality and quantity fields do not have their mode set, so they should default
+ * to NULLABLE. This is an important test of BigQuery semantics.
+ *
+ * All the other fields we set in this function are required on the Schema response.
+ *
+ * See https://cloud.google.com/bigquery/docs/reference/v2/tables#schema
+ */
+ private List<TableFieldSchema> fields =
+ Lists.newArrayList(
+ new TableFieldSchema().setName("number").setType("INTEGER").setMode("REQUIRED"),
+ new TableFieldSchema().setName("species").setType("STRING").setMode("NULLABLE"),
+ new TableFieldSchema().setName("quality").setType("FLOAT") /* default to NULLABLE */,
+ new TableFieldSchema().setName("quantity").setType("INTEGER") /* default to NULLABLE */,
+ new TableFieldSchema().setName("birthday").setType("TIMESTAMP").setMode("NULLABLE"),
+ new TableFieldSchema().setName("flighted").setType("BOOLEAN").setMode("NULLABLE"),
+ new TableFieldSchema().setName("sound").setType("BYTES").setMode("NULLABLE"),
+ new TableFieldSchema().setName("anniversaryDate").setType("DATE").setMode("NULLABLE"),
+ new TableFieldSchema().setName("anniversaryDatetime")
+ .setType("DATETIME").setMode("NULLABLE"),
+ new TableFieldSchema().setName("anniversaryTime").setType("TIME").setMode("NULLABLE"),
+ new TableFieldSchema().setName("scion").setType("RECORD").setMode("NULLABLE")
+ .setFields(subFields),
+ new TableFieldSchema().setName("associates").setType("RECORD").setMode("REPEATED")
+ .setFields(subFields));
+
@Test
public void testConvertGenericRecordToTableRow() throws Exception {
TableSchema tableSchema = new TableSchema();
- List<TableFieldSchema> subFields = Lists.<TableFieldSchema>newArrayList(
- new TableFieldSchema().setName("species").setType("STRING").setMode("NULLABLE"));
- /*
- * Note that the quality and quantity fields do not have their mode set, so they should default
- * to NULLABLE. This is an important test of BigQuery semantics.
- *
- * All the other fields we set in this function are required on the Schema response.
- *
- * See https://cloud.google.com/bigquery/docs/reference/v2/tables#schema
- */
- List<TableFieldSchema> fields =
- Lists.<TableFieldSchema>newArrayList(
- new TableFieldSchema().setName("number").setType("INTEGER").setMode("REQUIRED"),
- new TableFieldSchema().setName("species").setType("STRING").setMode("NULLABLE"),
- new TableFieldSchema().setName("quality").setType("FLOAT") /* default to NULLABLE */,
- new TableFieldSchema().setName("quantity").setType("INTEGER") /* default to NULLABLE */,
- new TableFieldSchema().setName("birthday").setType("TIMESTAMP").setMode("NULLABLE"),
- new TableFieldSchema().setName("flighted").setType("BOOLEAN").setMode("NULLABLE"),
- new TableFieldSchema().setName("sound").setType("BYTES").setMode("NULLABLE"),
- new TableFieldSchema().setName("anniversaryDate").setType("DATE").setMode("NULLABLE"),
- new TableFieldSchema().setName("anniversaryDatetime")
- .setType("DATETIME").setMode("NULLABLE"),
- new TableFieldSchema().setName("anniversaryTime").setType("TIME").setMode("NULLABLE"),
- new TableFieldSchema().setName("scion").setType("RECORD").setMode("NULLABLE")
- .setFields(subFields),
- new TableFieldSchema().setName("associates").setType("RECORD").setMode("REPEATED")
- .setFields(subFields));
tableSchema.setFields(fields);
Schema avroSchema = AvroCoder.of(Bird.class).getSchema();
@@ -132,6 +137,77 @@ public class BigQueryAvroUtilsTest {
}
}
+ @Test
+ public void testConvertBigQuerySchemaToAvroSchema() {
+ TableSchema tableSchema = new TableSchema();
+ tableSchema.setFields(fields);
+ Schema avroSchema =
+ BigQueryAvroUtils.toGenericAvroSchema("testSchema", tableSchema.getFields());
+
+ assertThat(avroSchema.getField("number").schema(), equalTo(Schema.create(Type.LONG)));
+ assertThat(
+ avroSchema.getField("species").schema(),
+ equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.STRING))));
+ assertThat(
+ avroSchema.getField("quality").schema(),
+ equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.DOUBLE))));
+ assertThat(
+ avroSchema.getField("quantity").schema(),
+ equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.LONG))));
+ assertThat(
+ avroSchema.getField("birthday").schema(),
+ equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.LONG))));
+ assertThat(
+ avroSchema.getField("flighted").schema(),
+ equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.BOOLEAN))));
+ assertThat(
+ avroSchema.getField("sound").schema(),
+ equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.BYTES))));
+ assertThat(
+ avroSchema.getField("anniversaryDate").schema(),
+ equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.STRING))));
+ assertThat(
+ avroSchema.getField("anniversaryDatetime").schema(),
+ equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.STRING))));
+ assertThat(
+ avroSchema.getField("anniversaryTime").schema(),
+ equalTo(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.STRING))));
+
+ assertThat(
+ avroSchema.getField("scion").schema(),
+ equalTo(
+ Schema.createUnion(
+ Schema.create(Type.NULL),
+ Schema.createRecord(
+ "scion",
+ "org.apache.beam.sdk.io.gcp.bigquery",
+ "Translated Avro Schema for scion",
+ false,
+ ImmutableList.of(
+ new Field(
+ "species",
+ Schema.createUnion(
+ Schema.create(Type.NULL), Schema.create(Type.STRING)),
+ null,
+ (Object) null))))));
+ assertThat(
+ avroSchema.getField("associates").schema(),
+ equalTo(
+ Schema.createArray(
+ Schema.createRecord(
+ "associates",
+ "org.apache.beam.sdk.io.gcp.bigquery",
+ "Translated Avro Schema for associates",
+ false,
+ ImmutableList.of(
+ new Field(
+ "species",
+ Schema.createUnion(
+ Schema.create(Type.NULL), Schema.create(Type.STRING)),
+ null,
+ (Object) null))))));
+ }
+
/**
* Pojo class used as the record type in tests.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f68fea02/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 05a7c5c..9d63611 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -18,6 +18,8 @@
package org.apache.beam.sdk.io.gcp.bigquery;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.fromJsonString;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.toJsonString;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
@@ -32,7 +34,9 @@ import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.when;
+import com.google.api.client.json.GenericJson;
import com.google.api.client.util.Data;
+import com.google.api.services.bigquery.model.Dataset;
import com.google.api.services.bigquery.model.ErrorProto;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfigurationExtract;
@@ -50,21 +54,36 @@ import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.common.base.Strings;
+import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
+import com.google.common.collect.Table.Cell;
+import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.io.Serializable;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import javax.annotation.Nullable;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
@@ -110,7 +129,9 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.IOChannelFactory;
import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.util.PCollectionViews;
+import org.apache.beam.sdk.util.Transport;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -144,6 +165,7 @@ public class BigQueryIOTest implements Serializable {
Status.SUCCEEDED, new Job().setStatus(new JobStatus()),
Status.FAILED, new Job().setStatus(new JobStatus().setErrorResult(new ErrorProto())));
+
private static class FakeBigQueryServices implements BigQueryServices {
private String[] jsonTableRowReturns = new String[0];
@@ -290,25 +312,25 @@ public class BigQueryIOTest implements Serializable {
@Override
public void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig)
throws InterruptedException, IOException {
- startJob(jobRef);
+ startJob(jobRef, loadConfig);
}
@Override
public void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig)
throws InterruptedException, IOException {
- startJob(jobRef);
+ startJob(jobRef, extractConfig);
}
@Override
public void startQueryJob(JobReference jobRef, JobConfigurationQuery query)
throws IOException, InterruptedException {
- startJob(jobRef);
+ startJob(jobRef, query);
}
@Override
public void startCopyJob(JobReference jobRef, JobConfigurationTableCopy copyConfig)
throws IOException, InterruptedException {
- startJob(jobRef);
+ startJob(jobRef, copyConfig);
}
@Override
@@ -338,7 +360,8 @@ public class BigQueryIOTest implements Serializable {
}
}
- private void startJob(JobReference jobRef) throws IOException, InterruptedException {
+ private void startJob(JobReference jobRef, GenericJson config)
+ throws IOException, InterruptedException {
if (!Strings.isNullOrEmpty(executingProject)) {
checkArgument(
jobRef.getProjectId().equals(executingProject),
@@ -352,6 +375,11 @@ public class BigQueryIOTest implements Serializable {
throw (IOException) ret;
} else if (ret instanceof InterruptedException) {
throw (InterruptedException) ret;
+ } else if (ret instanceof SerializableFunction) {
+ SerializableFunction<GenericJson, Void> fn =
+ (SerializableFunction<GenericJson, Void>) ret;
+ fn.apply(config);
+ return;
} else {
return;
}
@@ -392,6 +420,178 @@ public class BigQueryIOTest implements Serializable {
"Exceeded expected number of calls: " + getJobReturns.length);
}
}
+
+ ////////////////////////////////// SERIALIZATION METHODS ////////////////////////////////////
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.writeObject(replaceJobsWithBytes(startJobReturns));
+ out.writeObject(replaceJobsWithBytes(pollJobReturns));
+ out.writeObject(replaceJobsWithBytes(getJobReturns));
+ out.writeObject(executingProject);
+ }
+
+ private Object[] replaceJobsWithBytes(Object[] objs) {
+ Object[] copy = Arrays.copyOf(objs, objs.length);
+ for (int i = 0; i < copy.length; i++) {
+ checkArgument(
+ copy[i] == null || copy[i] instanceof Serializable || copy[i] instanceof Job,
+ "Only serializable elements and jobs can be added add to Job Returns");
+ if (copy[i] instanceof Job) {
+ try {
+ // Job is not serializable, so encode the job as a byte array.
+ copy[i] = Transport.getJsonFactory().toByteArray(copy[i]);
+ } catch (IOException e) {
+ throw new IllegalArgumentException(
+ String.format("Could not encode Job %s via available JSON factory", copy[i]));
+ }
+ }
+ }
+ return copy;
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ this.startJobReturns = replaceBytesWithJobs(in.readObject());
+ this.pollJobReturns = replaceBytesWithJobs(in.readObject());
+ this.getJobReturns = replaceBytesWithJobs(in.readObject());
+ this.executingProject = (String) in.readObject();
+ }
+
+ private Object[] replaceBytesWithJobs(Object obj) throws IOException {
+ checkState(obj instanceof Object[]);
+ Object[] objs = (Object[]) obj;
+ Object[] copy = Arrays.copyOf(objs, objs.length);
+ for (int i = 0; i < copy.length; i++) {
+ if (copy[i] instanceof byte[]) {
+ Job job = Transport.getJsonFactory()
+ .createJsonParser(new ByteArrayInputStream((byte[]) copy[i]))
+ .parse(Job.class);
+ copy[i] = job;
+ }
+ }
+ return copy;
+ }
+ }
+
+ /** A fake dataset service that can be serialized, for use in testReadFromTable. */
+ private static class FakeDatasetService implements DatasetService, Serializable {
+ private com.google.common.collect.Table<String, String, Map<String, Table>> tables =
+ HashBasedTable.create();
+
+ public FakeDatasetService withTable(
+ String projectId, String datasetId, String tableId, Table table) throws IOException {
+ Map<String, Table> dataset = tables.get(projectId, datasetId);
+ if (dataset == null) {
+ dataset = new HashMap<>();
+ tables.put(projectId, datasetId, dataset);
+ }
+ dataset.put(tableId, table);
+ return this;
+ }
+
+ @Override
+ public Table getTable(String projectId, String datasetId, String tableId)
+ throws InterruptedException, IOException {
+ Map<String, Table> dataset =
+ checkNotNull(
+ tables.get(projectId, datasetId),
+ "Tried to get a table %s:%s.%s from %s, but no such table was set",
+ projectId,
+ datasetId,
+ tableId,
+ FakeDatasetService.class.getSimpleName());
+ return checkNotNull(dataset.get(tableId),
+ "Tried to get a table %s:%s.%s from %s, but no such table was set",
+ projectId,
+ datasetId,
+ tableId,
+ FakeDatasetService.class.getSimpleName());
+ }
+
+ @Override
+ public void deleteTable(String projectId, String datasetId, String tableId)
+ throws IOException, InterruptedException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ @Override
+ public boolean isTableEmpty(String projectId, String datasetId, String tableId)
+ throws IOException, InterruptedException {
+ Long numBytes = getTable(projectId, datasetId, tableId).getNumBytes();
+ return numBytes == null || numBytes == 0L;
+ }
+
+ @Override
+ public Dataset getDataset(
+ String projectId, String datasetId) throws IOException, InterruptedException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ @Override
+ public void createDataset(
+ String projectId, String datasetId, String location, String description)
+ throws IOException, InterruptedException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ @Override
+ public void deleteDataset(String projectId, String datasetId)
+ throws IOException, InterruptedException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ @Override
+ public long insertAll(
+ TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList)
+ throws IOException, InterruptedException {
+ throw new UnsupportedOperationException("Unsupported");
+ }
+
+ ////////////////////////////////// SERIALIZATION METHODS ////////////////////////////////////
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.writeObject(replaceTablesWithBytes(this.tables));
+ }
+
+ private com.google.common.collect.Table<String, String, Map<String, byte[]>>
+ replaceTablesWithBytes(
+ com.google.common.collect.Table<String, String, Map<String, Table>> toCopy)
+ throws IOException {
+ com.google.common.collect.Table<String, String, Map<String, byte[]>> copy =
+ HashBasedTable.create();
+ for (Cell<String, String, Map<String, Table>> cell : toCopy.cellSet()) {
+ HashMap<String, byte[]> dataset = new HashMap<>();
+ copy.put(cell.getRowKey(), cell.getColumnKey(), dataset);
+ for (Map.Entry<String, Table> dsTables : cell.getValue().entrySet()) {
+ dataset.put(
+ dsTables.getKey(), Transport.getJsonFactory().toByteArray(dsTables.getValue()));
+ }
+ }
+ return copy;
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ com.google.common.collect.Table<String, String, Map<String, byte[]>> tablesTable =
+ (com.google.common.collect.Table<String, String, Map<String, byte[]>>) in.readObject();
+ this.tables = replaceBytesWithTables(tablesTable);
+ }
+
+ private com.google.common.collect.Table<String, String, Map<String, Table>>
+ replaceBytesWithTables(
+ com.google.common.collect.Table<String, String, Map<String, byte[]>> tablesTable)
+ throws IOException {
+ com.google.common.collect.Table<String, String, Map<String, Table>> copy =
+ HashBasedTable.create();
+ for (Cell<String, String, Map<String, byte[]>> cell : tablesTable.cellSet()) {
+ HashMap<String, Table> dataset = new HashMap<>();
+ copy.put(cell.getRowKey(), cell.getColumnKey(), dataset);
+ for (Map.Entry<String, byte[]> dsTables : cell.getValue().entrySet()) {
+ Table table =
+ Transport.getJsonFactory()
+ .createJsonParser(new ByteArrayInputStream(dsTables.getValue()))
+ .parse(Table.class);
+ dataset.put(dsTables.getKey(), table);
+ }
+ }
+ return copy;
+ }
}
@Rule public transient ExpectedException thrown = ExpectedException.none();
@@ -627,11 +827,54 @@ public class BigQueryIOTest implements Serializable {
bqOptions.setProject("defaultProject");
bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
+ Job job = new Job();
+ JobStatus status = new JobStatus();
+ job.setStatus(status);
+ JobStatistics jobStats = new JobStatistics();
+ job.setStatistics(jobStats);
+ JobStatistics4 extract = new JobStatistics4();
+ jobStats.setExtract(extract);
+ extract.setDestinationUriFileCounts(ImmutableList.of(1L));
+
+ Table sometable = new Table();
+ sometable.setSchema(
+ new TableSchema()
+ .setFields(
+ ImmutableList.of(
+ new TableFieldSchema().setName("name").setType("STRING"),
+ new TableFieldSchema().setName("number").setType("INTEGER"))));
+ sometable.setNumBytes(1024L * 1024L);
+ FakeDatasetService fakeDatasetService =
+ new FakeDatasetService()
+ .withTable("non-executing-project", "somedataset", "sometable", sometable);
+ SerializableFunction<Void, Schema> schemaGenerator =
+ new SerializableFunction<Void, Schema>() {
+ @Override
+ public Schema apply(Void input) {
+ return BigQueryAvroUtils.toGenericAvroSchema(
+ "sometable",
+ ImmutableList.of(
+ new TableFieldSchema().setName("name").setType("STRING"),
+ new TableFieldSchema().setName("number").setType("INTEGER")));
+ }
+ };
+ Collection<Map<String, Object>> records =
+ ImmutableList.<Map<String, Object>>builder()
+ .add(ImmutableMap.<String, Object>builder().put("name", "a").put("number", 1L).build())
+ .add(ImmutableMap.<String, Object>builder().put("name", "b").put("number", 2L).build())
+ .add(ImmutableMap.<String, Object>builder().put("name", "c").put("number", 3L).build())
+ .build();
+
+ SerializableFunction<GenericJson, Void> onStartJob =
+ new WriteExtractFiles(schemaGenerator, records);
+
FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
.withJobService(new FakeJobService()
- .startJobReturns("done", "done")
+ .startJobReturns(onStartJob, "done")
+ .pollJobReturns(job)
.getJobReturns((Job) null)
.verifyExecutingProject(bqOptions.getProject()))
+ .withDatasetService(fakeDatasetService)
.readerReturns(
toJsonString(new TableRow().set("name", "a").set("number", 1)),
toJsonString(new TableRow().set("name", "b").set("number", 2)),
@@ -1701,4 +1944,41 @@ public class BigQueryIOTest implements Serializable {
return pathname.isFile();
}}).length);
}
+
+ private class WriteExtractFiles implements SerializableFunction<GenericJson, Void> {
+ private final SerializableFunction<Void, Schema> schemaGenerator;
+ private final Collection<Map<String, Object>> records;
+
+ private WriteExtractFiles(
+ SerializableFunction<Void, Schema> schemaGenerator,
+ Collection<Map<String, Object>> records) {
+ this.schemaGenerator = schemaGenerator;
+ this.records = records;
+ }
+
+ @Override
+ public Void apply(GenericJson input) {
+ List<String> destinations = (List<String>) input.get("destinationUris");
+ for (String destination : destinations) {
+ String newDest = destination.replace("*", "000000000000");
+ Schema schema = schemaGenerator.apply(null);
+ try (WritableByteChannel channel = IOChannelUtils.create(newDest, MimeTypes.BINARY);
+ DataFileWriter<GenericRecord> tableRowWriter =
+ new DataFileWriter<>(new GenericDatumWriter<GenericRecord>(schema))
+ .create(schema, Channels.newOutputStream(channel))) {
+ for (Map<String, Object> record : records) {
+ GenericRecordBuilder genericRecordBuilder = new GenericRecordBuilder(schema);
+ for (Map.Entry<String, Object> field : record.entrySet()) {
+ genericRecordBuilder.set(field.getKey(), field.getValue());
+ }
+ tableRowWriter.append(genericRecordBuilder.build());
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ String.format("Could not create destination for extract job %s", destination), e);
+ }
+ }
+ return null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f68fea02/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index f21e6c0..3ca2b64 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.gcp.bigtable;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Verify.verifyNotNull;
import static org.apache.beam.sdk.testing.SourceTestUtils.assertSourcesEqualReferenceSource;
import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionExhaustive;
@@ -222,6 +223,7 @@ public class BigtableIOTest {
public void testReadingEmptyTable() throws Exception {
final String table = "TEST-EMPTY-TABLE";
service.createTable(table);
+ service.setupSampleRowKeys(table, 1, 1L);
runReadTest(defaultRead.withTableId(table), new ArrayList<Row>());
logged.verifyInfo("Closing reader after reading 0 records.");
@@ -234,8 +236,9 @@ public class BigtableIOTest {
final int numRows = 1001;
List<Row> testRows = makeTableData(table, numRows);
+ service.setupSampleRowKeys(table, 3, 1000L);
runReadTest(defaultRead.withTableId(table), testRows);
- logged.verifyInfo(String.format("Closing reader after reading %d records.", numRows));
+ logged.verifyInfo(String.format("Closing reader after reading %d records.", numRows / 3));
}
/** A {@link Predicate} that a {@link Row Row's} key matches the given regex. */
@@ -284,6 +287,7 @@ public class BigtableIOTest {
ByteKey startKey = ByteKey.copyFrom("key000000100".getBytes());
ByteKey endKey = ByteKey.copyFrom("key000000300".getBytes());
+ service.setupSampleRowKeys(table, numRows / 10, "key000000100".length());
// Test prefix: [beginning, startKey).
final ByteKeyRange prefixRange = ByteKeyRange.ALL_KEYS.withEndKey(startKey);
List<Row> prefixRows = filterToRange(testRows, prefixRange);
@@ -336,6 +340,7 @@ public class BigtableIOTest {
RowFilter filter =
RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8(regex)).build();
+ service.setupSampleRowKeys(table, 5, 10L);
runReadTest(
defaultRead.withTableId(table).withRowFilter(filter), Lists.newArrayList(filteredRows));
@@ -743,7 +748,7 @@ public class BigtableIOTest {
@Override
public List<SampleRowKeysResponse> getSampleRowKeys(BigtableSource source) {
List<SampleRowKeysResponse> samples = sampleRowKeys.get(source.getTableId());
- checkArgument(samples != null, "No samples found for table %s", source.getTableId());
+ checkNotNull(samples, "No samples found for table %s", source.getTableId());
return samples;
}
[2/2] incubator-beam git commit: [BEAM-310] Perform initial splitting
in the DirectRunner
Posted by lc...@apache.org.
[BEAM-310] Perform initial splitting in the DirectRunner
This closes #1063
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0a413e78
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0a413e78
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0a413e78
Branch: refs/heads/master
Commit: 0a413e78e8f937ab3c221b78be25037021685e0d
Parents: 86d222a f68fea0
Author: Luke Cwik <lc...@google.com>
Authored: Fri Oct 14 13:55:24 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Oct 14 13:55:24 2016 -0700
----------------------------------------------------------------------
.../direct/BoundedReadEvaluatorFactory.java | 40 ++-
.../beam/runners/direct/DirectOptions.java | 23 ++
.../beam/runners/direct/DirectRunner.java | 11 +-
.../beam/runners/direct/EmptyInputProvider.java | 12 +-
.../direct/ExecutorServiceParallelExecutor.java | 15 +-
.../beam/runners/direct/RootInputProvider.java | 7 +-
.../runners/direct/RootProviderRegistry.java | 5 +-
.../direct/TestStreamEvaluatorFactory.java | 4 +-
.../direct/TransformEvaluatorRegistry.java | 10 +-
.../direct/UnboundedReadEvaluatorFactory.java | 35 ++-
.../beam/runners/direct/WatermarkManager.java | 1 +
.../direct/BoundedReadEvaluatorFactoryTest.java | 41 ++-
.../direct/FlattenEvaluatorFactoryTest.java | 9 +-
.../direct/TestStreamEvaluatorFactoryTest.java | 2 +-
.../UnboundedReadEvaluatorFactoryTest.java | 55 +++-
.../sdk/io/gcp/bigquery/BigQueryAvroUtils.java | 69 ++++-
.../io/gcp/bigquery/BigQueryAvroUtilsTest.java | 132 +++++++--
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 292 ++++++++++++++++++-
.../sdk/io/gcp/bigtable/BigtableIOTest.java | 9 +-
19 files changed, 662 insertions(+), 110 deletions(-)
----------------------------------------------------------------------