You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:53 UTC
[29/67] [partial] incubator-beam git commit: Directory reorganization
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorServices.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorServices.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorServices.java
deleted file mode 100644
index 34efdf6..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorServices.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.common.base.MoreObjects;
-
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Static factory methods for constructing instances of {@link TransformExecutorService}.
- */
-final class TransformExecutorServices {
- private TransformExecutorServices() {
- // Do not instantiate
- }
-
- /**
- * Returns an EvaluationState that evaluates {@link TransformExecutor TransformExecutors} in
- * parallel.
- */
- public static TransformExecutorService parallel(
- ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) {
- return new ParallelEvaluationState(executor, scheduled);
- }
-
- /**
- * Returns an EvaluationState that evaluates {@link TransformExecutor TransformExecutors} in
- * serial.
- */
- public static TransformExecutorService serial(
- ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) {
- return new SerialEvaluationState(executor, scheduled);
- }
-
- /**
- * A {@link TransformExecutorService} with unlimited parallelism. Any {@link TransformExecutor}
- * scheduled will be immediately submitted to the {@link ExecutorService}.
- *
- * <p>A principal use of this is for the evaluation of an unkeyed Step. Unkeyed computations are
- * processed in parallel.
- */
- private static class ParallelEvaluationState implements TransformExecutorService {
- private final ExecutorService executor;
- private final Map<TransformExecutor<?>, Boolean> scheduled;
-
- private ParallelEvaluationState(
- ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) {
- this.executor = executor;
- this.scheduled = scheduled;
- }
-
- @Override
- public void schedule(TransformExecutor<?> work) {
- executor.submit(work);
- scheduled.put(work, true);
- }
-
- @Override
- public void complete(TransformExecutor<?> completed) {
- scheduled.remove(completed);
- }
- }
-
- /**
- * A {@link TransformExecutorService} with a single work queue. Any {@link TransformExecutor}
- * scheduled will be placed on the work queue. Only one item of work will be submitted to the
- * {@link ExecutorService} at any time.
- *
- * <p>A principal use of this is for the serial evaluation of a (Step, Key) pair.
- * Keyed computations are processed serially per step.
- */
- private static class SerialEvaluationState implements TransformExecutorService {
- private final ExecutorService executor;
- private final Map<TransformExecutor<?>, Boolean> scheduled;
-
- private AtomicReference<TransformExecutor<?>> currentlyEvaluating;
- private final Queue<TransformExecutor<?>> workQueue;
-
- private SerialEvaluationState(
- ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) {
- this.scheduled = scheduled;
- this.executor = executor;
- this.currentlyEvaluating = new AtomicReference<>();
- this.workQueue = new ConcurrentLinkedQueue<>();
- }
-
- /**
- * Schedules the work, adding it to the work queue if there is a bundle currently being
- * evaluated and scheduling it immediately otherwise.
- */
- @Override
- public void schedule(TransformExecutor<?> work) {
- workQueue.offer(work);
- updateCurrentlyEvaluating();
- }
-
- @Override
- public void complete(TransformExecutor<?> completed) {
- if (!currentlyEvaluating.compareAndSet(completed, null)) {
- throw new IllegalStateException(
- "Finished work "
- + completed
- + " but could not complete due to unexpected currently executing "
- + currentlyEvaluating.get());
- }
- scheduled.remove(completed);
- updateCurrentlyEvaluating();
- }
-
- private void updateCurrentlyEvaluating() {
- if (currentlyEvaluating.get() == null) {
- // Only synchronize if we need to update what's currently evaluating
- synchronized (this) {
- TransformExecutor<?> newWork = workQueue.poll();
- if (newWork != null) {
- if (currentlyEvaluating.compareAndSet(null, newWork)) {
- scheduled.put(newWork, true);
- executor.submit(newWork);
- } else {
- workQueue.offer(newWork);
- }
- }
- }
- }
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(SerialEvaluationState.class)
- .add("currentlyEvaluating", currentlyEvaluating)
- .add("workQueue", workQueue)
- .toString();
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java
deleted file mode 100644
index 549afab..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.io.Read.Unbounded;
-import com.google.cloud.dataflow.sdk.io.UnboundedSource;
-import com.google.cloud.dataflow.sdk.io.UnboundedSource.CheckpointMark;
-import com.google.cloud.dataflow.sdk.io.UnboundedSource.UnboundedReader;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import java.io.IOException;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator TransformEvaluators}
- * for the {@link Unbounded Read.Unbounded} primitive {@link PTransform}.
- */
-class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
- /*
- * An evaluator for a Source is stateful, to ensure the CheckpointMark is properly persisted.
- * Evaluators are cached here to ensure that the checkpoint mark is appropriately reused
- * and any splits are honored.
- */
- private final ConcurrentMap<EvaluatorKey, Queue<? extends UnboundedReadEvaluator<?>>>
- sourceEvaluators = new ConcurrentHashMap<>();
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- @Override
- public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> application,
- @Nullable CommittedBundle<?> inputBundle, InProcessEvaluationContext evaluationContext) {
- return getTransformEvaluator((AppliedPTransform) application, evaluationContext);
- }
-
- private <OutputT> TransformEvaluator<?> getTransformEvaluator(
- final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform,
- final InProcessEvaluationContext evaluationContext) {
- UnboundedReadEvaluator<?> currentEvaluator =
- getTransformEvaluatorQueue(transform, evaluationContext).poll();
- if (currentEvaluator == null) {
- return EmptyTransformEvaluator.create(transform);
- }
- return currentEvaluator;
- }
-
- /**
- * Get the queue of {@link TransformEvaluator TransformEvaluators} that produce elements for the
- * provided application of {@link Unbounded Read.Unbounded}, initializing it if required.
- *
- * <p>This method is thread-safe, and will only produce new evaluators if no other invocation has
- * already done so.
- */
- @SuppressWarnings("unchecked")
- private <OutputT> Queue<UnboundedReadEvaluator<OutputT>> getTransformEvaluatorQueue(
- final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform,
- final InProcessEvaluationContext evaluationContext) {
- // Key by the application and the context the evaluation is occurring in (which call to
- // Pipeline#run).
- EvaluatorKey key = new EvaluatorKey(transform, evaluationContext);
- @SuppressWarnings("unchecked")
- Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue =
- (Queue<UnboundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
- if (evaluatorQueue == null) {
- evaluatorQueue = new ConcurrentLinkedQueue<>();
- if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) {
- // If no queue existed in the evaluators, add an evaluator to initialize the evaluator
- // factory for this transform
- UnboundedReadEvaluator<OutputT> evaluator =
- new UnboundedReadEvaluator<OutputT>(transform, evaluationContext, evaluatorQueue);
- evaluatorQueue.offer(evaluator);
- } else {
- // otherwise return the existing Queue that arrived before us
- evaluatorQueue = (Queue<UnboundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
- }
- }
- return evaluatorQueue;
- }
-
- /**
- * A {@link UnboundedReadEvaluator} produces elements from an underlying {@link UnboundedSource},
- * discarding all input elements. Within the call to {@link #finishBundle()}, the evaluator
- * creates the {@link UnboundedReader} and consumes some currently available input.
- *
- * <p>Calls to {@link UnboundedReadEvaluator} are not internally thread-safe, and should only be
- * used by a single thread at a time. Each {@link UnboundedReadEvaluator} maintains its own
- * checkpoint, and constructs its reader from the current checkpoint in each call to
- * {@link #finishBundle()}.
- */
- private static class UnboundedReadEvaluator<OutputT> implements TransformEvaluator<Object> {
- private static final int ARBITRARY_MAX_ELEMENTS = 10;
- private final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform;
- private final InProcessEvaluationContext evaluationContext;
- private final Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue;
- private CheckpointMark checkpointMark;
-
- public UnboundedReadEvaluator(
- AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform,
- InProcessEvaluationContext evaluationContext,
- Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue) {
- this.transform = transform;
- this.evaluationContext = evaluationContext;
- this.evaluatorQueue = evaluatorQueue;
- this.checkpointMark = null;
- }
-
- @Override
- public void processElement(WindowedValue<Object> element) {}
-
- @Override
- public InProcessTransformResult finishBundle() throws IOException {
- UncommittedBundle<OutputT> output = evaluationContext.createRootBundle(transform.getOutput());
- try (UnboundedReader<OutputT> reader =
- createReader(
- transform.getTransform().getSource(), evaluationContext.getPipelineOptions());) {
- int numElements = 0;
- if (reader.start()) {
- do {
- output.add(
- WindowedValue.timestampedValueInGlobalWindow(
- reader.getCurrent(), reader.getCurrentTimestamp()));
- numElements++;
- } while (numElements < ARBITRARY_MAX_ELEMENTS && reader.advance());
- }
- checkpointMark = reader.getCheckpointMark();
- checkpointMark.finalizeCheckpoint();
- // TODO: When exercising create initial splits, make this the minimum watermark across all
- // existing readers
- StepTransformResult result =
- StepTransformResult.withHold(transform, reader.getWatermark())
- .addOutput(output)
- .build();
- evaluatorQueue.offer(this);
- return result;
- }
- }
-
- private <CheckpointMarkT extends CheckpointMark> UnboundedReader<OutputT> createReader(
- UnboundedSource<OutputT, CheckpointMarkT> source, PipelineOptions options) {
- @SuppressWarnings("unchecked")
- CheckpointMarkT mark = (CheckpointMarkT) checkpointMark;
- return source.createReader(options, mark);
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java
deleted file mode 100644
index dd2bfb1..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.coders.VoidCoder;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.Values;
-import com.google.cloud.dataflow.sdk.transforms.View.CreatePCollectionView;
-import com.google.cloud.dataflow.sdk.transforms.WithKeys;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the
- * {@link CreatePCollectionView} primitive {@link PTransform}.
- *
- * <p>The {@link ViewEvaluatorFactory} produces {@link TransformEvaluator TransformEvaluators} for
- * the {@link WriteView} {@link PTransform}, which is part of the
- * {@link InProcessCreatePCollectionView} composite transform. This transform is an override for the
- * {@link CreatePCollectionView} transform that applies windowing and triggers before the view is
- * written.
- */
-class ViewEvaluatorFactory implements TransformEvaluatorFactory {
- @Override
- public <T> TransformEvaluator<T> forApplication(
- AppliedPTransform<?, ?, ?> application,
- InProcessPipelineRunner.CommittedBundle<?> inputBundle,
- InProcessEvaluationContext evaluationContext) {
- @SuppressWarnings({"cast", "unchecked", "rawtypes"})
- TransformEvaluator<T> evaluator = (TransformEvaluator<T>) createEvaluator(
- (AppliedPTransform) application, evaluationContext);
- return evaluator;
- }
-
- private <InT, OuT> TransformEvaluator<Iterable<InT>> createEvaluator(
- final AppliedPTransform<PCollection<Iterable<InT>>, PCollectionView<OuT>, WriteView<InT, OuT>>
- application,
- InProcessEvaluationContext context) {
- PCollection<Iterable<InT>> input = application.getInput();
- final PCollectionViewWriter<InT, OuT> writer =
- context.createPCollectionViewWriter(input, application.getOutput());
- return new TransformEvaluator<Iterable<InT>>() {
- private final List<WindowedValue<InT>> elements = new ArrayList<>();
-
- @Override
- public void processElement(WindowedValue<Iterable<InT>> element) {
- for (InT input : element.getValue()) {
- elements.add(element.withValue(input));
- }
- }
-
- @Override
- public InProcessTransformResult finishBundle() {
- writer.add(elements);
- return StepTransformResult.withoutHold(application).build();
- }
- };
- }
-
- /**
- * An in-process override for {@link CreatePCollectionView}.
- */
- public static class InProcessCreatePCollectionView<ElemT, ViewT>
- extends PTransform<PCollection<ElemT>, PCollectionView<ViewT>> {
- private final CreatePCollectionView<ElemT, ViewT> og;
-
- private InProcessCreatePCollectionView(CreatePCollectionView<ElemT, ViewT> og) {
- this.og = og;
- }
-
- @Override
- public PCollectionView<ViewT> apply(PCollection<ElemT> input) {
- return input.apply(WithKeys.<Void, ElemT>of((Void) null))
- .setCoder(KvCoder.of(VoidCoder.of(), input.getCoder()))
- .apply(GroupByKey.<Void, ElemT>create())
- .apply(Values.<Iterable<ElemT>>create())
- .apply(new WriteView<ElemT, ViewT>(og));
- }
- }
-
- /**
- * An in-process implementation of the {@link CreatePCollectionView} primitive.
- *
- * This implementation requires the input {@link PCollection} to be an iterable, which is provided
- * to {@link PCollectionView#fromIterableInternal(Iterable)}.
- */
- public static final class WriteView<ElemT, ViewT>
- extends PTransform<PCollection<Iterable<ElemT>>, PCollectionView<ViewT>> {
- private final CreatePCollectionView<ElemT, ViewT> og;
-
- WriteView(CreatePCollectionView<ElemT, ViewT> og) {
- this.og = og;
- }
-
- @Override
- public PCollectionView<ViewT> apply(PCollection<Iterable<ElemT>> input) {
- return og.getView();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutor.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutor.java
deleted file mode 100644
index 27d59b9..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutor.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.common.collect.ComparisonChain;
-import com.google.common.collect.Ordering;
-
-import org.joda.time.Instant;
-
-import java.util.PriorityQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-/**
- * Executes callbacks that occur based on the progression of the watermark per-step.
- *
- * <p>Callbacks are registered by calls to
- * {@link #callOnGuaranteedFiring(AppliedPTransform, BoundedWindow, WindowingStrategy, Runnable)},
- * and are executed after a call to {@link #fireForWatermark(AppliedPTransform, Instant)} with the
- * same {@link AppliedPTransform} and a watermark sufficient to ensure that the trigger for the
- * windowing strategy would have been produced.
- *
- * <p>NOTE: {@link WatermarkCallbackExecutor} does not track the latest observed watermark for any
- * {@link AppliedPTransform} - any call to
- * {@link #callOnGuaranteedFiring(AppliedPTransform, BoundedWindow, WindowingStrategy, Runnable)}
- * that could have potentially already fired should be followed by a call to
- * {@link #fireForWatermark(AppliedPTransform, Instant)} for the same transform with the current
- * value of the watermark.
- */
-class WatermarkCallbackExecutor {
- /**
- * Create a new {@link WatermarkCallbackExecutor}.
- */
- public static WatermarkCallbackExecutor create() {
- return new WatermarkCallbackExecutor();
- }
-
- private final ConcurrentMap<AppliedPTransform<?, ?, ?>, PriorityQueue<WatermarkCallback>>
- callbacks;
- private final ExecutorService executor;
-
- private WatermarkCallbackExecutor() {
- this.callbacks = new ConcurrentHashMap<>();
- this.executor = Executors.newSingleThreadExecutor();
- }
-
- /**
- * Execute the provided {@link Runnable} after the next call to
- * {@link #fireForWatermark(AppliedPTransform, Instant)} where the window is guaranteed to have
- * produced output.
- */
- public void callOnGuaranteedFiring(
- AppliedPTransform<?, ?, ?> step,
- BoundedWindow window,
- WindowingStrategy<?, ?> windowingStrategy,
- Runnable runnable) {
- WatermarkCallback callback =
- WatermarkCallback.onGuaranteedFiring(window, windowingStrategy, runnable);
-
- PriorityQueue<WatermarkCallback> callbackQueue = callbacks.get(step);
- if (callbackQueue == null) {
- callbackQueue = new PriorityQueue<>(11, new CallbackOrdering());
- if (callbacks.putIfAbsent(step, callbackQueue) != null) {
- callbackQueue = callbacks.get(step);
- }
- }
-
- synchronized (callbackQueue) {
- callbackQueue.offer(callback);
- }
- }
-
- /**
- * Schedule all pending callbacks that must have produced output by the time of the provided
- * watermark.
- */
- public void fireForWatermark(AppliedPTransform<?, ?, ?> step, Instant watermark) {
- PriorityQueue<WatermarkCallback> callbackQueue = callbacks.get(step);
- if (callbackQueue == null) {
- return;
- }
- synchronized (callbackQueue) {
- while (!callbackQueue.isEmpty() && callbackQueue.peek().shouldFire(watermark)) {
- executor.submit(callbackQueue.poll().getCallback());
- }
- }
- }
-
- private static class WatermarkCallback {
- public static <W extends BoundedWindow> WatermarkCallback onGuaranteedFiring(
- BoundedWindow window, WindowingStrategy<?, W> strategy, Runnable callback) {
- @SuppressWarnings("unchecked")
- Instant firingAfter =
- strategy.getTrigger().getSpec().getWatermarkThatGuaranteesFiring((W) window);
- return new WatermarkCallback(firingAfter, callback);
- }
-
- private final Instant fireAfter;
- private final Runnable callback;
-
- private WatermarkCallback(Instant fireAfter, Runnable callback) {
- this.fireAfter = fireAfter;
- this.callback = callback;
- }
-
- public boolean shouldFire(Instant currentWatermark) {
- return currentWatermark.isAfter(fireAfter)
- || currentWatermark.equals(BoundedWindow.TIMESTAMP_MAX_VALUE);
- }
-
- public Runnable getCallback() {
- return callback;
- }
- }
-
- private static class CallbackOrdering extends Ordering<WatermarkCallback> {
- @Override
- public int compare(WatermarkCallback left, WatermarkCallback right) {
- return ComparisonChain.start()
- .compare(left.fireAfter, right.fireAfter)
- .compare(left.callback, right.callback, Ordering.arbitrary())
- .result();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/package-info.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/package-info.java
deleted file mode 100644
index d1aa6af..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/package-info.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/**
- * Defines runners for executing Pipelines in different modes, including
- * {@link com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner} and
- * {@link com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner}.
- *
- * <p>{@link com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner} executes a {@code Pipeline}
- * locally, without contacting the Dataflow service.
- * {@link com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner} submits a
- * {@code Pipeline} to the Dataflow service, which executes it on Dataflow-managed Compute Engine
- * instances. {@code DataflowPipelineRunner} returns
- * as soon as the {@code Pipeline} has been submitted. Use
- * {@link com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner} to have execution
- * updates printed to the console.
- *
- * <p>The runner is specified as part {@link com.google.cloud.dataflow.sdk.options.PipelineOptions}.
- */
-package com.google.cloud.dataflow.sdk.runners;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/IsmFormat.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/IsmFormat.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/IsmFormat.java
deleted file mode 100644
index 318de9b..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/IsmFormat.java
+++ /dev/null
@@ -1,946 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.worker;
-
-import static com.google.cloud.dataflow.sdk.util.Structs.addLong;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
-import com.google.cloud.dataflow.sdk.coders.ByteArrayCoder;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.Coder.NonDeterministicException;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.coders.ListCoder;
-import com.google.cloud.dataflow.sdk.coders.StandardCoder;
-import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
-import com.google.cloud.dataflow.sdk.coders.VarLongCoder;
-import com.google.cloud.dataflow.sdk.util.CloudObject;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.util.RandomAccessData;
-import com.google.cloud.dataflow.sdk.util.VarInt;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.MoreObjects.ToStringHelper;
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import javax.annotation.Nullable;
-
-/**
- * An Ism file is a prefix encoded composite key value file broken into shards. Each composite
- * key is composed of a fixed number of component keys. A fixed number of those sub keys represent
- * the shard key portion; see {@link IsmRecord} and {@link IsmRecordCoder} for further details
- * around the data format. In addition to the data, there is a bloom filter,
- * and multiple indices to allow for efficient retrieval.
- *
- * <p>An Ism file is composed of these high level sections (in order):
- * <ul>
- * <li>shard block</li>
- * <li>bloom filter (See {@code ScalableBloomFilter} for details on encoding format)</li>
- * <li>shard index</li>
- * <li>footer (See {@link Footer} for details on encoding format)</li>
- * </ul>
- *
- * <p>The shard block is composed of multiple copies of the following:
- * <ul>
- * <li>data block</li>
- * <li>data index</li>
- * </ul>
- *
- * <p>The data block is composed of multiple copies of the following:
- * <ul>
- * <li>key prefix (See {@link KeyPrefix} for details on encoding format)</li>
- * <li>unshared key bytes</li>
- * <li>value bytes</li>
- * <li>optional 0x00 0x00 bytes followed by metadata bytes
- * (if the following 0x00 0x00 bytes are not present, then there are no metadata bytes)</li>
- * </ul>
- * Each key written into the data block must be in unsigned lexicographically increasing order
- * and also its shard portion of the key must hash to the same shard id as all other keys
- * within the same data block. The hashing function used is the
- * <a href="http://smhasher.googlecode.com/svn/trunk/MurmurHash3.cpp">
- * 32-bit murmur3 algorithm, x86 variant</a> (little-endian variant),
- * using {@code 1225801234} as the seed value.
- *
- * <p>The data index is composed of {@code N} copies of the following:
- * <ul>
- * <li>key prefix (See {@link KeyPrefix} for details on encoding format)</li>
- * <li>unshared key bytes</li>
- * <li>byte offset to key prefix in data block (variable length long coding)</li>
- * </ul>
- *
- * <p>The shard index is composed of a {@link VarInt variable length integer} encoding representing
- * the number of shard index records followed by that many shard index records.
- * See {@link IsmShardCoder} for further details as to its encoding scheme.
- */
-public class IsmFormat {
- private static final int HASH_SEED = 1225801234;
- private static final HashFunction HASH_FUNCTION = Hashing.murmur3_32(HASH_SEED);
- static final int SHARD_BITS = 0x7F; // [0-127] shards + [128-255] metadata shards
-
- /**
- * A record containing a composite key and either a value or metadata. The composite key
- * must not contain the metadata key component place holder if producing a value record, and must
- * contain the metadata component key place holder if producing a metadata record.
- *
- * <p>The composite key is a fixed number of component keys where the first {@code N} component
- * keys are used to create a shard id via hashing. See {@link IsmRecordCoder#hash(List)} for
- * further details.
- */
- public static class IsmRecord<V> {
- /** Returns an IsmRecord with the specified key components and value. */
- public static <V> IsmRecord<V> of(List<?> keyComponents, V value) {
- checkNotNull(keyComponents);
- checkArgument(!keyComponents.isEmpty(), "Expected non-empty list of key components.");
- checkArgument(!isMetadataKey(keyComponents),
- "Expected key components to not contain metadata key.");
- return new IsmRecord<>(keyComponents, value, null);
- }
-
- public static <V> IsmRecord<V> meta(List<?> keyComponents, byte[] metadata) {
- checkNotNull(keyComponents);
- checkNotNull(metadata);
- checkArgument(!keyComponents.isEmpty(), "Expected non-empty list of key components.");
- checkArgument(isMetadataKey(keyComponents),
- "Expected key components to contain metadata key.");
- return new IsmRecord<V>(keyComponents, null, metadata);
- }
-
- private final List<?> keyComponents;
- @Nullable
- private final V value;
- @Nullable
- private final byte[] metadata;
- private IsmRecord(List<?> keyComponents, V value, byte[] metadata) {
- this.keyComponents = keyComponents;
- this.value = value;
- this.metadata = metadata;
- }
-
- /** Returns the list of key components. */
- public List<?> getKeyComponents() {
- return keyComponents;
- }
-
- /** Returns the key component at the specified index. */
- public Object getKeyComponent(int index) {
- return keyComponents.get(index);
- }
-
- /**
- * Returns the value. Throws {@link IllegalStateException} if this is not a
- * value record.
- */
- public V getValue() {
- checkState(!isMetadataKey(keyComponents),
- "This is a metadata record and not a value record.");
- return value;
- }
-
- /**
- * Returns the metadata. Throws {@link IllegalStateException} if this is not a
- * metadata record.
- */
- public byte[] getMetadata() {
- checkState(isMetadataKey(keyComponents),
- "This is a value record and not a metadata record.");
- return metadata;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof IsmRecord)) {
- return false;
- }
- IsmRecord<?> other = (IsmRecord<?>) obj;
- return Objects.equal(keyComponents, other.keyComponents)
- && Objects.equal(value, other.value)
- && Arrays.equals(metadata, other.metadata);
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(keyComponents, value, Arrays.hashCode(metadata));
- }
-
- @Override
- public String toString() {
- ToStringHelper builder = MoreObjects.toStringHelper(IsmRecord.class)
- .add("keyComponents", keyComponents);
- if (isMetadataKey(keyComponents)) {
- builder.add("metadata", metadata);
- } else {
- builder.add("value", value);
- }
- return builder.toString();
- }
- }
-
- /** A {@link Coder} for {@link IsmRecord}s.
- *
- * <p>Note that this coder standalone will not produce an Ism file. This coder can be used
- * to materialize a {@link PCollection} of {@link IsmRecord}s. Only when this coder
- * is combined with an {@link IsmSink} will one produce an Ism file.
- *
- * <p>The {@link IsmRecord} encoded format is:
- * <ul>
- * <li>encoded key component 1 using key component coder 1</li>
- * <li>...</li>
- * <li>encoded key component N using key component coder N</li>
- * <li>encoded value using value coder</li>
- * </ul>
- */
- public static class IsmRecordCoder<V>
- extends StandardCoder<IsmRecord<V>> {
- /** Returns an IsmRecordCoder with the specified key component coders, value coder. */
- public static <V> IsmRecordCoder<V> of(
- int numberOfShardKeyCoders,
- int numberOfMetadataShardKeyCoders,
- List<Coder<?>> keyComponentCoders,
- Coder<V> valueCoder) {
- checkNotNull(keyComponentCoders);
- checkArgument(keyComponentCoders.size() > 0);
- checkArgument(numberOfShardKeyCoders > 0);
- checkArgument(numberOfShardKeyCoders <= keyComponentCoders.size());
- checkArgument(numberOfMetadataShardKeyCoders <= keyComponentCoders.size());
- return new IsmRecordCoder<>(
- numberOfShardKeyCoders,
- numberOfMetadataShardKeyCoders,
- keyComponentCoders,
- valueCoder);
- }
-
- /**
- * Returns an IsmRecordCoder with the specified coders. Note that this method is not meant
- * to be called by users but used by Jackson when decoding this coder.
- */
- @JsonCreator
- public static IsmRecordCoder<?> of(
- @JsonProperty(PropertyNames.NUM_SHARD_CODERS) int numberOfShardCoders,
- @JsonProperty(PropertyNames.NUM_METADATA_SHARD_CODERS) int numberOfMetadataShardCoders,
- @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components) {
- Preconditions.checkArgument(components.size() >= 2,
- "Expecting at least 2 components, got " + components.size());
- return of(
- numberOfShardCoders,
- numberOfMetadataShardCoders,
- components.subList(0, components.size() - 1),
- components.get(components.size() - 1));
- }
-
- private final int numberOfShardKeyCoders;
- private final int numberOfMetadataShardKeyCoders;
- private final List<Coder<?>> keyComponentCoders;
- private final Coder<V> valueCoder;
-
- private IsmRecordCoder(
- int numberOfShardKeyCoders,
- int numberOfMetadataShardKeyCoders,
- List<Coder<?>> keyComponentCoders, Coder<V> valueCoder) {
- this.numberOfShardKeyCoders = numberOfShardKeyCoders;
- this.numberOfMetadataShardKeyCoders = numberOfMetadataShardKeyCoders;
- this.keyComponentCoders = keyComponentCoders;
- this.valueCoder = valueCoder;
- }
-
- /** Returns the list of key component coders. */
- public List<Coder<?>> getKeyComponentCoders() {
- return keyComponentCoders;
- }
-
- /** Returns the key coder at the specified index. */
- public Coder getKeyComponentCoder(int index) {
- return keyComponentCoders.get(index);
- }
-
- /** Returns the value coder. */
- public Coder<V> getValueCoder() {
- return valueCoder;
- }
-
- @Override
- public void encode(IsmRecord<V> value, OutputStream outStream,
- Coder.Context context) throws CoderException, IOException {
- if (value.getKeyComponents().size() != keyComponentCoders.size()) {
- throw new CoderException(String.format(
- "Expected %s key component(s) but received key component(s) %s.",
- keyComponentCoders.size(), value.getKeyComponents()));
- }
- for (int i = 0; i < keyComponentCoders.size(); ++i) {
- getKeyComponentCoder(i).encode(value.getKeyComponent(i), outStream, context.nested());
- }
- if (isMetadataKey(value.getKeyComponents())) {
- ByteArrayCoder.of().encode(value.getMetadata(), outStream, context.nested());
- } else {
- valueCoder.encode(value.getValue(), outStream, context.nested());
- }
- }
-
- @Override
- public IsmRecord<V> decode(InputStream inStream, Coder.Context context)
- throws CoderException, IOException {
- List<Object> keyComponents = new ArrayList<>(keyComponentCoders.size());
- for (Coder<?> keyCoder : keyComponentCoders) {
- keyComponents.add(keyCoder.decode(inStream, context.nested()));
- }
- if (isMetadataKey(keyComponents)) {
- return IsmRecord.<V>meta(
- keyComponents, ByteArrayCoder.of().decode(inStream, context.nested()));
- } else {
- return IsmRecord.<V>of(keyComponents, valueCoder.decode(inStream, context.nested()));
- }
- }
-
- int getNumberOfShardKeyCoders(List<?> keyComponents) {
- if (isMetadataKey(keyComponents)) {
- return numberOfMetadataShardKeyCoders;
- } else {
- return numberOfShardKeyCoders;
- }
- }
-
- /**
- * Computes the shard id for the given key component(s).
- *
- * The shard keys are encoded into their byte representations and hashed using the
- * <a href="http://smhasher.googlecode.com/svn/trunk/MurmurHash3.cpp">
- * 32-bit murmur3 algorithm, x86 variant</a> (little-endian variant),
- * using {@code 1225801234} as the seed value. We ensure that shard ids for
- * metadata keys and normal keys do not overlap.
- */
- public <V, T> int hash(List<?> keyComponents) {
- return encodeAndHash(keyComponents, new RandomAccessData(), new ArrayList<Integer>());
- }
-
- /**
- * Computes the shard id for the given key component(s).
- *
- * Mutates {@code keyBytes} such that when returned, contains the encoded
- * version of the key components.
- */
- <V, T> int encodeAndHash(List<?> keyComponents, RandomAccessData keyBytesToMutate) {
- return encodeAndHash(keyComponents, keyBytesToMutate, new ArrayList<Integer>());
- }
-
- /**
- * Computes the shard id for the given key component(s).
- *
- * Mutates {@code keyBytes} such that when returned, contains the encoded
- * version of the key components. Also, mutates {@code keyComponentByteOffsetsToMutate} to
- * store the location where each key component's encoded byte representation ends within
- * {@code keyBytes}.
- */
- <V, T> int encodeAndHash(
- List<?> keyComponents,
- RandomAccessData keyBytesToMutate,
- List<Integer> keyComponentByteOffsetsToMutate) {
- checkNotNull(keyComponents);
- checkArgument(keyComponents.size() <= keyComponentCoders.size(),
- "Expected at most %s key component(s) but received %s.",
- keyComponentCoders.size(), keyComponents);
-
- final int numberOfKeyCodersToUse;
- final int shardOffset;
- if (isMetadataKey(keyComponents)) {
- numberOfKeyCodersToUse = numberOfMetadataShardKeyCoders;
- shardOffset = SHARD_BITS + 1;
- } else {
- numberOfKeyCodersToUse = numberOfShardKeyCoders;
- shardOffset = 0;
- }
-
- checkArgument(numberOfKeyCodersToUse <= keyComponents.size(),
- "Expected at least %s key component(s) but received %s.",
- numberOfShardKeyCoders, keyComponents);
-
- try {
- // Encode the shard portion
- for (int i = 0; i < numberOfKeyCodersToUse; ++i) {
- getKeyComponentCoder(i).encode(
- keyComponents.get(i), keyBytesToMutate.asOutputStream(), Context.NESTED);
- keyComponentByteOffsetsToMutate.add(keyBytesToMutate.size());
- }
- int rval = HASH_FUNCTION.hashBytes(
- keyBytesToMutate.array(), 0, keyBytesToMutate.size()).asInt() & SHARD_BITS;
- rval += shardOffset;
-
- // Encode the remainder
- for (int i = numberOfKeyCodersToUse; i < keyComponents.size(); ++i) {
- getKeyComponentCoder(i).encode(
- keyComponents.get(i), keyBytesToMutate.asOutputStream(), Context.NESTED);
- keyComponentByteOffsetsToMutate.add(keyBytesToMutate.size());
- }
- return rval;
- } catch (IOException e) {
- throw new IllegalStateException(
- String.format("Failed to hash %s with coder %s", keyComponents, this), e);
- }
- }
-
- @Override
- public List<Coder<?>> getCoderArguments() {
- return ImmutableList.<Coder<?>>builder()
- .addAll(keyComponentCoders)
- .add(valueCoder)
- .build();
- }
-
- @Override
- public CloudObject asCloudObject() {
- CloudObject cloudObject = super.asCloudObject();
- addLong(cloudObject, PropertyNames.NUM_SHARD_CODERS, numberOfShardKeyCoders);
- addLong(cloudObject, PropertyNames.NUM_METADATA_SHARD_CODERS, numberOfMetadataShardKeyCoders);
- return cloudObject;
- }
-
- @Override
- public void verifyDeterministic() throws Coder.NonDeterministicException {
- verifyDeterministic("Key component coders expected to be deterministic.", keyComponentCoders);
- verifyDeterministic("Value coder expected to be deterministic.", valueCoder);
- }
-
- @Override
- public boolean consistentWithEquals() {
- for (Coder<?> keyComponentCoder : keyComponentCoders) {
- if (!keyComponentCoder.consistentWithEquals()) {
- return false;
- }
- }
- return valueCoder.consistentWithEquals();
- }
-
- @Override
- public Object structuralValue(IsmRecord<V> record) throws Exception {
- checkState(record.getKeyComponents().size() == keyComponentCoders.size(),
- "Expected the number of key component coders %s "
- + "to match the number of key components %s.",
- keyComponentCoders.size(), record.getKeyComponents());
-
- if (record != null && consistentWithEquals()) {
- ArrayList<Object> keyComponentStructuralValues = new ArrayList<>();
- for (int i = 0; i < keyComponentCoders.size(); ++i) {
- keyComponentStructuralValues.add(
- getKeyComponentCoder(i).structuralValue(record.getKeyComponent(i)));
- }
- if (isMetadataKey(record.getKeyComponents())) {
- return IsmRecord.meta(keyComponentStructuralValues, record.getMetadata());
- } else {
- return IsmRecord.of(keyComponentStructuralValues,
- valueCoder.structuralValue(record.getValue()));
- }
- }
- return super.structuralValue(record);
- }
- }
-
- /**
- * Validates that the key portion of the given coder is deterministic.
- */
- static void validateCoderIsCompatible(IsmRecordCoder<?> coder) {
- for (Coder<?> keyComponentCoder : coder.getKeyComponentCoders()) {
- try {
- keyComponentCoder.verifyDeterministic();
- } catch (NonDeterministicException e) {
- throw new IllegalArgumentException(
- String.format("Key component coder %s is expected to be deterministic.",
- keyComponentCoder), e);
- }
- }
- }
-
- /** Returns true if and only if any of the passed in key components represent a metadata key. */
- public static boolean isMetadataKey(List<?> keyComponents) {
- for (Object keyComponent : keyComponents) {
- if (keyComponent == METADATA_KEY) {
- return true;
- }
- }
- return false;
- }
-
- /** A marker object representing the wildcard metadata key component. */
- private static final Object METADATA_KEY = new Object() {
- @Override
- public String toString() {
- return "META";
- }
-
- @Override
- public boolean equals(Object obj) {
- return this == obj;
- }
-
- @Override
- public int hashCode() {
- return -1248902349;
- }
- };
-
- /**
- * An object representing a wild card for a key component.
- * Encoded using {@link MetadataKeyCoder}.
- */
- public static Object getMetadataKey() {
- return METADATA_KEY;
- }
-
- /**
- * A coder for metadata key component. Can be used to wrap key component coder allowing for
- * the metadata key component to be used as a place holder instead of an actual key.
- */
- public static class MetadataKeyCoder<K> extends StandardCoder<K> {
- public static <K> MetadataKeyCoder<K> of(Coder<K> keyCoder) {
- checkNotNull(keyCoder);
- return new MetadataKeyCoder<>(keyCoder);
- }
-
- /**
- * Returns an IsmRecordCoder with the specified coders. Note that this method is not meant
- * to be called by users but used by Jackson when decoding this coder.
- */
- @JsonCreator
- public static MetadataKeyCoder<?> of(
- @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components) {
- Preconditions.checkArgument(components.size() == 1,
- "Expecting one component, got " + components.size());
- return of(components.get(0));
- }
-
- private final Coder<K> keyCoder;
-
- private MetadataKeyCoder(Coder<K> keyCoder) {
- this.keyCoder = keyCoder;
- }
-
- public Coder<K> getKeyCoder() {
- return keyCoder;
- }
-
- @Override
- public void encode(K value, OutputStream outStream, Coder.Context context)
- throws CoderException, IOException {
- if (value == METADATA_KEY) {
- outStream.write(0);
- } else {
- outStream.write(1);
- keyCoder.encode(value, outStream, context.nested());
- }
- }
-
- @Override
- public K decode(InputStream inStream, Coder.Context context)
- throws CoderException, IOException {
- int marker = inStream.read();
- if (marker == 0) {
- return (K) getMetadataKey();
- } else if (marker == 1) {
- return keyCoder.decode(inStream, context.nested());
- } else {
- throw new CoderException(String.format("Expected marker but got %s.", marker));
- }
- }
-
- @Override
- public List<Coder<?>> getCoderArguments() {
- return ImmutableList.<Coder<?>>of(keyCoder);
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- verifyDeterministic("Expected key coder to be deterministic", keyCoder);
- }
- }
-
- /**
- * A shard descriptor containing shard id, the data block offset, and the index offset for the
- * given shard.
- */
- public static class IsmShard {
- private final int id;
- private final long blockOffset;
- private final long indexOffset;
-
- /** Returns an IsmShard with the given id, block offset and no index offset. */
- public static IsmShard of(int id, long blockOffset) {
- IsmShard ismShard = new IsmShard(id, blockOffset, -1);
- checkState(id >= 0,
- "%s attempting to be written with negative shard id.",
- ismShard);
- checkState(blockOffset >= 0,
- "%s attempting to be written with negative block offset.",
- ismShard);
- return ismShard;
- }
-
- /** Returns an IsmShard with the given id, block offset, and index offset. */
- public static IsmShard of(int id, long blockOffset, long indexOffset) {
- IsmShard ismShard = new IsmShard(id, blockOffset, indexOffset);
- checkState(id >= 0,
- "%s attempting to be written with negative shard id.",
- ismShard);
- checkState(blockOffset >= 0,
- "%s attempting to be written with negative block offset.",
- ismShard);
- checkState(indexOffset >= 0,
- "%s attempting to be written with negative index offset.",
- ismShard);
- return ismShard;
- }
-
- private IsmShard(int id, long blockOffset, long indexOffset) {
- this.id = id;
- this.blockOffset = blockOffset;
- this.indexOffset = indexOffset;
- }
-
- /** Return the shard id. */
- public int getId() {
- return id;
- }
-
- /** Return the absolute position within the Ism file where the data block begins. */
- public long getBlockOffset() {
- return blockOffset;
- }
-
- /**
- * Return the absolute position within the Ism file where the index block begins.
- * Throws {@link IllegalStateException} if the index offset was never specified.
- */
- public long getIndexOffset() {
- checkState(indexOffset >= 0,
- "Unable to fetch index offset because it was never specified.");
- return indexOffset;
- }
-
- /** Returns a new IsmShard like this one with the specified index offset. */
- public IsmShard withIndexOffset(long indexOffset) {
- return of(id, blockOffset, indexOffset);
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(IsmShard.class)
- .add("id", id)
- .add("blockOffset", blockOffset)
- .add("indexOffset", indexOffset)
- .toString();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof IsmShard)) {
- return false;
- }
- IsmShard other = (IsmShard) obj;
- return Objects.equal(id, other.id)
- && Objects.equal(blockOffset, other.blockOffset)
- && Objects.equal(indexOffset, other.indexOffset);
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(id, blockOffset, indexOffset);
- }
- }
-
- /**
- * A {@link ListCoder} wrapping a {@link IsmShardCoder} used to encode the shard index.
- * See {@link ListCoder} for its encoding specification and {@link IsmShardCoder} for its
- * encoding specification.
- */
- public static final Coder<List<IsmShard>> ISM_SHARD_INDEX_CODER =
- ListCoder.of(IsmShardCoder.of());
-
- /**
- * A coder for {@link IsmShard}s.
- *
- * The shard descriptor is encoded as:
- * <ul>
- * <li>id (variable length integer encoding)</li>
- * <li>blockOffset (variable length long encoding)</li>
- * <li>indexOffset (variable length long encoding)</li>
- * </ul>
- */
- public static class IsmShardCoder extends AtomicCoder<IsmShard> {
- private static final IsmShardCoder INSTANCE = new IsmShardCoder();
-
- /** Returns an IsmShardCoder. */
- @JsonCreator
- public static IsmShardCoder of() {
- return INSTANCE;
- }
-
- private IsmShardCoder() {
- }
-
- @Override
- public void encode(IsmShard value, OutputStream outStream, Coder.Context context)
- throws CoderException, IOException {
- checkState(value.getIndexOffset() >= 0,
- "%s attempting to be written without index offset.",
- value);
- VarIntCoder.of().encode(value.getId(), outStream, context.nested());
- VarLongCoder.of().encode(value.getBlockOffset(), outStream, context.nested());
- VarLongCoder.of().encode(value.getIndexOffset(), outStream, context.nested());
- }
-
- @Override
- public IsmShard decode(
- InputStream inStream, Coder.Context context) throws CoderException, IOException {
- return IsmShard.of(
- VarIntCoder.of().decode(inStream, context),
- VarLongCoder.of().decode(inStream, context),
- VarLongCoder.of().decode(inStream, context));
- }
-
- @Override
- public boolean consistentWithEquals() {
- return true;
- }
- }
-
- /**
- * The prefix used before each key which contains the number of shared and unshared
- * bytes from the previous key that was read. The key prefix along with the previous key
- * and the unshared key bytes allows one to construct the current key by doing the following
- * {@code currentKey = previousKey[0 : sharedBytes] + read(unsharedBytes)}.
- *
- * <p>The key prefix is encoded as:
- * <ul>
- * <li>number of shared key bytes (variable length integer coding)</li>
- * <li>number of unshared key bytes (variable length integer coding)</li>
- * </ul>
- */
- static class KeyPrefix {
- private final int sharedKeySize;
- private final int unsharedKeySize;
-
- KeyPrefix(int sharedBytes, int unsharedBytes) {
- this.sharedKeySize = sharedBytes;
- this.unsharedKeySize = unsharedBytes;
- }
-
- public int getSharedKeySize() {
- return sharedKeySize;
- }
-
- public int getUnsharedKeySize() {
- return unsharedKeySize;
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(sharedKeySize, unsharedKeySize);
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == this) {
- return true;
- }
- if (!(other instanceof KeyPrefix)) {
- return false;
- }
- KeyPrefix keyPrefix = (KeyPrefix) other;
- return sharedKeySize == keyPrefix.sharedKeySize
- && unsharedKeySize == keyPrefix.unsharedKeySize;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("sharedKeySize", sharedKeySize)
- .add("unsharedKeySize", unsharedKeySize)
- .toString();
- }
- }
-
- /** A {@link Coder} for {@link KeyPrefix}. */
- static final class KeyPrefixCoder extends AtomicCoder<KeyPrefix> {
- private static final KeyPrefixCoder INSTANCE = new KeyPrefixCoder();
-
- @JsonCreator
- public static KeyPrefixCoder of() {
- return INSTANCE;
- }
-
- @Override
- public void encode(KeyPrefix value, OutputStream outStream, Coder.Context context)
- throws CoderException, IOException {
- VarInt.encode(value.sharedKeySize, outStream);
- VarInt.encode(value.unsharedKeySize, outStream);
- }
-
- @Override
- public KeyPrefix decode(InputStream inStream, Coder.Context context)
- throws CoderException, IOException {
- return new KeyPrefix(VarInt.decodeInt(inStream), VarInt.decodeInt(inStream));
- }
-
- @Override
- public boolean consistentWithEquals() {
- return true;
- }
-
- @Override
- public boolean isRegisterByteSizeObserverCheap(KeyPrefix value, Coder.Context context) {
- return true;
- }
-
- @Override
- protected long getEncodedElementByteSize(KeyPrefix value, Coder.Context context)
- throws Exception {
- Preconditions.checkNotNull(value);
- return VarInt.getLength(value.sharedKeySize) + VarInt.getLength(value.unsharedKeySize);
- }
- }
-
- /**
- * The footer stores the relevant information required to locate the index and bloom filter.
- * It also stores a version byte and the number of keys stored.
- *
- * <p>The footer is encoded as the value containing:
- * <ul>
- * <li>start of bloom filter offset (big endian long coding)</li>
- * <li>start of shard index position offset (big endian long coding)</li>
- * <li>number of keys in file (big endian long coding)</li>
- * <li>0x01 (version key as a single byte)</li>
- * </ul>
- */
- static class Footer {
- static final int LONG_BYTES = 8;
- static final int FIXED_LENGTH = 3 * LONG_BYTES + 1;
- static final byte VERSION = 2;
-
- private final long indexPosition;
- private final long bloomFilterPosition;
- private final long numberOfKeys;
-
- Footer(long indexPosition, long bloomFilterPosition, long numberOfKeys) {
- this.indexPosition = indexPosition;
- this.bloomFilterPosition = bloomFilterPosition;
- this.numberOfKeys = numberOfKeys;
- }
-
- public long getIndexPosition() {
- return indexPosition;
- }
-
- public long getBloomFilterPosition() {
- return bloomFilterPosition;
- }
-
- public long getNumberOfKeys() {
- return numberOfKeys;
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == this) {
- return true;
- }
- if (!(other instanceof Footer)) {
- return false;
- }
- Footer footer = (Footer) other;
- return indexPosition == footer.indexPosition
- && bloomFilterPosition == footer.bloomFilterPosition
- && numberOfKeys == footer.numberOfKeys;
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(indexPosition, bloomFilterPosition, numberOfKeys);
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("version", Footer.VERSION)
- .add("indexPosition", indexPosition)
- .add("bloomFilterPosition", bloomFilterPosition)
- .add("numberOfKeys", numberOfKeys)
- .toString();
- }
- }
-
- /** A {@link Coder} for {@link Footer}. */
- static final class FooterCoder extends AtomicCoder<Footer> {
- private static final FooterCoder INSTANCE = new FooterCoder();
-
- @JsonCreator
- public static FooterCoder of() {
- return INSTANCE;
- }
-
- @Override
- public void encode(Footer value, OutputStream outStream, Coder.Context context)
- throws CoderException, IOException {
- DataOutputStream dataOut = new DataOutputStream(outStream);
- dataOut.writeLong(value.indexPosition);
- dataOut.writeLong(value.bloomFilterPosition);
- dataOut.writeLong(value.numberOfKeys);
- dataOut.write(Footer.VERSION);
- }
-
- @Override
- public Footer decode(InputStream inStream, Coder.Context context)
- throws CoderException, IOException {
- DataInputStream dataIn = new DataInputStream(inStream);
- Footer footer = new Footer(dataIn.readLong(), dataIn.readLong(), dataIn.readLong());
- int version = dataIn.read();
- if (version != Footer.VERSION) {
- throw new IOException("Unknown version " + version + ". "
- + "Only version 2 is currently supported.");
- }
- return footer;
- }
-
- @Override
- public boolean consistentWithEquals() {
- return true;
- }
-
- @Override
- public boolean isRegisterByteSizeObserverCheap(Footer value, Coder.Context context) {
- return true;
- }
-
- @Override
- protected long getEncodedElementByteSize(Footer value, Coder.Context context)
- throws Exception {
- return Footer.FIXED_LENGTH;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/package-info.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/package-info.java
deleted file mode 100644
index af0a345..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/**
- * Implementation of the harness that runs on each Google Compute Engine instance to coordinate
- * execution of Pipeline code.
- */
-@ParametersAreNonnullByDefault
-package com.google.cloud.dataflow.sdk.runners.worker;
-
-import javax.annotation.ParametersAreNonnullByDefault;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/CoderProperties.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/CoderProperties.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/CoderProperties.java
deleted file mode 100644
index 5705dc4..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/testing/CoderProperties.java
+++ /dev/null
@@ -1,349 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.testing;
-
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.emptyIterable;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasItem;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.Coder.NonDeterministicException;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.util.CoderUtils;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.util.SerializableUtils;
-import com.google.cloud.dataflow.sdk.util.Serializer;
-import com.google.cloud.dataflow.sdk.util.Structs;
-import com.google.cloud.dataflow.sdk.util.UnownedInputStream;
-import com.google.cloud.dataflow.sdk.util.UnownedOutputStream;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterables;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Properties for use in {@link Coder} tests. These are implemented with junit assertions
- * rather than as predicates for the sake of error messages.
- *
- * <p>We serialize and deserialize the coder to make sure that any state information required by
- * the coder is preserved. This causes tests written such that coders that lose information during
- * serialization or change state during encoding/decoding will fail.
- */
-public class CoderProperties {
-
- /**
- * All the contexts, for use in test cases.
- */
- public static final List<Coder.Context> ALL_CONTEXTS = Arrays.asList(
- Coder.Context.OUTER, Coder.Context.NESTED);
-
- /**
- * Verifies that for the given {@code Coder<T>}, and values of
- * type {@code T}, if the values are equal then the encoded bytes are equal, in any
- * {@code Coder.Context}.
- */
- public static <T> void coderDeterministic(
- Coder<T> coder, T value1, T value2)
- throws Exception {
- for (Coder.Context context : ALL_CONTEXTS) {
- coderDeterministicInContext(coder, context, value1, value2);
- }
- }
-
- /**
- * Verifies that for the given {@code Coder<T>}, {@code Coder.Context}, and values of
- * type {@code T}, if the values are equal then the encoded bytes are equal.
- */
- public static <T> void coderDeterministicInContext(
- Coder<T> coder, Coder.Context context, T value1, T value2)
- throws Exception {
-
- try {
- coder.verifyDeterministic();
- } catch (NonDeterministicException e) {
- fail("Expected that the coder is deterministic");
- }
- assertThat("Expected that the passed in values are equal()", value1, equalTo(value2));
- assertThat(
- encode(coder, context, value1),
- equalTo(encode(coder, context, value2)));
- }
-
- /**
- * Verifies that for the given {@code Coder<T>},
- * and value of type {@code T}, encoding followed by decoding yields an
- * equal value of type {@code T}, in any {@code Coder.Context}.
- */
- public static <T> void coderDecodeEncodeEqual(
- Coder<T> coder, T value)
- throws Exception {
- for (Coder.Context context : ALL_CONTEXTS) {
- coderDecodeEncodeEqualInContext(coder, context, value);
- }
- }
-
- /**
- * Verifies that for the given {@code Coder<T>}, {@code Coder.Context},
- * and value of type {@code T}, encoding followed by decoding yields an
- * equal value of type {@code T}.
- */
- public static <T> void coderDecodeEncodeEqualInContext(
- Coder<T> coder, Coder.Context context, T value)
- throws Exception {
- assertThat(decodeEncode(coder, context, value), equalTo(value));
- }
-
- /**
- * Verifies that for the given {@code Coder<Collection<T>>},
- * and value of type {@code Collection<T>}, encoding followed by decoding yields an
- * equal value of type {@code Collection<T>}, in any {@code Coder.Context}.
- */
- public static <T, CollectionT extends Collection<T>> void coderDecodeEncodeContentsEqual(
- Coder<CollectionT> coder, CollectionT value)
- throws Exception {
- for (Coder.Context context : ALL_CONTEXTS) {
- coderDecodeEncodeContentsEqualInContext(coder, context, value);
- }
- }
-
- /**
- * Verifies that for the given {@code Coder<Collection<T>>},
- * and value of type {@code Collection<T>}, encoding followed by decoding yields an
- * equal value of type {@code Collection<T>}, in the given {@code Coder.Context}.
- */
- @SuppressWarnings("unchecked")
- public static <T, CollectionT extends Collection<T>> void coderDecodeEncodeContentsEqualInContext(
- Coder<CollectionT> coder, Coder.Context context, CollectionT value)
- throws Exception {
- // Matchers.containsInAnyOrder() requires at least one element
- Collection<T> result = decodeEncode(coder, context, value);
- if (value.isEmpty()) {
- assertThat(result, emptyIterable());
- } else {
- // This is the only Matchers.containInAnyOrder() overload that takes literal values
- assertThat(result, containsInAnyOrder((T[]) value.toArray()));
- }
- }
-
- /**
- * Verifies that for the given {@code Coder<Collection<T>>},
- * and value of type {@code Collection<T>}, encoding followed by decoding yields an
- * equal value of type {@code Collection<T>}, in any {@code Coder.Context}.
- */
- public static <T, IterableT extends Iterable<T>> void coderDecodeEncodeContentsInSameOrder(
- Coder<IterableT> coder, IterableT value)
- throws Exception {
- for (Coder.Context context : ALL_CONTEXTS) {
- CoderProperties.<T, IterableT>coderDecodeEncodeContentsInSameOrderInContext(
- coder, context, value);
- }
- }
-
- /**
- * Verifies that for the given {@code Coder<Iterable<T>>},
- * and value of type {@code Iterable<T>}, encoding followed by decoding yields an
- * equal value of type {@code Collection<T>}, in the given {@code Coder.Context}.
- */
- @SuppressWarnings("unchecked")
- public static <T, IterableT extends Iterable<T>> void
- coderDecodeEncodeContentsInSameOrderInContext(
- Coder<IterableT> coder, Coder.Context context, IterableT value)
- throws Exception {
- Iterable<T> result = decodeEncode(coder, context, value);
- // Matchers.contains() requires at least one element
- if (Iterables.isEmpty(value)) {
- assertThat(result, emptyIterable());
- } else {
- // This is the only Matchers.contains() overload that takes literal values
- assertThat(result, contains((T[]) Iterables.toArray(value, Object.class)));
- }
- }
-
- public static <T> void coderSerializable(Coder<T> coder) {
- SerializableUtils.ensureSerializable(coder);
- }
-
- public static <T> void coderConsistentWithEquals(
- Coder<T> coder, T value1, T value2)
- throws Exception {
-
- for (Coder.Context context : ALL_CONTEXTS) {
- CoderProperties.<T>coderConsistentWithEqualsInContext(coder, context, value1, value2);
- }
- }
-
- public static <T> void coderConsistentWithEqualsInContext(
- Coder<T> coder, Coder.Context context, T value1, T value2) throws Exception {
-
- assertEquals(
- value1.equals(value2),
- Arrays.equals(
- encode(coder, context, value1),
- encode(coder, context, value2)));
- }
-
- public static <T> void coderHasEncodingId(Coder<T> coder, String encodingId) throws Exception {
- assertThat(coder.getEncodingId(), equalTo(encodingId));
- assertThat(Structs.getString(coder.asCloudObject(), PropertyNames.ENCODING_ID, ""),
- equalTo(encodingId));
- }
-
- public static <T> void coderAllowsEncoding(Coder<T> coder, String encodingId) throws Exception {
- assertThat(coder.getAllowedEncodings(), hasItem(encodingId));
- assertThat(
- String.format("Expected to find \"%s\" in property \"%s\" of %s",
- encodingId, PropertyNames.ALLOWED_ENCODINGS, coder.asCloudObject()),
- Structs.getStrings(
- coder.asCloudObject(),
- PropertyNames.ALLOWED_ENCODINGS,
- Collections.<String>emptyList()),
- hasItem(encodingId));
- }
-
- public static <T> void structuralValueConsistentWithEquals(
- Coder<T> coder, T value1, T value2)
- throws Exception {
-
- for (Coder.Context context : ALL_CONTEXTS) {
- CoderProperties.<T>structuralValueConsistentWithEqualsInContext(
- coder, context, value1, value2);
- }
- }
-
- public static <T> void structuralValueConsistentWithEqualsInContext(
- Coder<T> coder, Coder.Context context, T value1, T value2) throws Exception {
-
- assertEquals(
- coder.structuralValue(value1).equals(coder.structuralValue(value2)),
- Arrays.equals(
- encode(coder, context, value1),
- encode(coder, context, value2)));
- }
-
-
- private static final String DECODING_WIRE_FORMAT_MESSAGE =
- "Decoded value from known wire format does not match expected value."
- + " This probably means that this Coder no longer correctly decodes"
- + " a prior wire format. Changing the wire formats this Coder can read"
- + " should be avoided, as it is likely to cause breakage."
- + " If you truly intend to change the backwards compatibility for this Coder "
- + " then you must remove any now-unsupported encodings from getAllowedEncodings().";
-
- public static <T> void coderDecodesBase64(Coder<T> coder, String base64Encoding, T value)
- throws Exception {
- assertThat(DECODING_WIRE_FORMAT_MESSAGE, CoderUtils.decodeFromBase64(coder, base64Encoding),
- equalTo(value));
- }
-
- public static <T> void coderDecodesBase64(
- Coder<T> coder, List<String> base64Encodings, List<T> values) throws Exception {
- assertThat("List of base64 encodings has different size than List of values",
- base64Encodings.size(), equalTo(values.size()));
-
- for (int i = 0; i < base64Encodings.size(); i++) {
- coderDecodesBase64(coder, base64Encodings.get(i), values.get(i));
- }
- }
-
- private static final String ENCODING_WIRE_FORMAT_MESSAGE =
- "Encoded value does not match expected wire format."
- + " Changing the wire format should be avoided, as it is likely to cause breakage."
- + " If you truly intend to change the wire format for this Coder "
- + " then you must update getEncodingId() to a new value and add any supported"
- + " prior formats to getAllowedEncodings()."
- + " See com.google.cloud.dataflow.sdk.coders.PrintBase64Encoding for how to generate"
- + " new test data.";
-
- public static <T> void coderEncodesBase64(Coder<T> coder, T value, String base64Encoding)
- throws Exception {
- assertThat(ENCODING_WIRE_FORMAT_MESSAGE, CoderUtils.encodeToBase64(coder, value),
- equalTo(base64Encoding));
- }
-
- public static <T> void coderEncodesBase64(
- Coder<T> coder, List<T> values, List<String> base64Encodings) throws Exception {
- assertThat("List of base64 encodings has different size than List of values",
- base64Encodings.size(), equalTo(values.size()));
-
- for (int i = 0; i < base64Encodings.size(); i++) {
- coderEncodesBase64(coder, values.get(i), base64Encodings.get(i));
- }
- }
-
- @SuppressWarnings("unchecked")
- public static <T, IterableT extends Iterable<T>> void coderDecodesBase64ContentsEqual(
- Coder<IterableT> coder, String base64Encoding, IterableT expected) throws Exception {
-
- IterableT result = CoderUtils.decodeFromBase64(coder, base64Encoding);
- if (Iterables.isEmpty(expected)) {
- assertThat(ENCODING_WIRE_FORMAT_MESSAGE, result, emptyIterable());
- } else {
- assertThat(ENCODING_WIRE_FORMAT_MESSAGE, result,
- containsInAnyOrder((T[]) Iterables.toArray(expected, Object.class)));
- }
- }
-
- public static <T, IterableT extends Iterable<T>> void coderDecodesBase64ContentsEqual(
- Coder<IterableT> coder, List<String> base64Encodings, List<IterableT> expected)
- throws Exception {
- assertThat("List of base64 encodings has different size than List of values",
- base64Encodings.size(), equalTo(expected.size()));
-
- for (int i = 0; i < base64Encodings.size(); i++) {
- coderDecodesBase64ContentsEqual(coder, base64Encodings.get(i), expected.get(i));
- }
- }
-
- //////////////////////////////////////////////////////////////////////////
-
- @VisibleForTesting
- static <T> byte[] encode(
- Coder<T> coder, Coder.Context context, T value) throws CoderException, IOException {
- @SuppressWarnings("unchecked")
- Coder<T> deserializedCoder = Serializer.deserialize(coder.asCloudObject(), Coder.class);
-
- ByteArrayOutputStream os = new ByteArrayOutputStream();
- deserializedCoder.encode(value, new UnownedOutputStream(os), context);
- return os.toByteArray();
- }
-
- @VisibleForTesting
- static <T> T decode(
- Coder<T> coder, Coder.Context context, byte[] bytes) throws CoderException, IOException {
- @SuppressWarnings("unchecked")
- Coder<T> deserializedCoder = Serializer.deserialize(coder.asCloudObject(), Coder.class);
-
- ByteArrayInputStream is = new ByteArrayInputStream(bytes);
- return deserializedCoder.decode(new UnownedInputStream(is), context);
- }
-
- private static <T> T decodeEncode(Coder<T> coder, Coder.Context context, T value)
- throws CoderException, IOException {
- return decode(coder, context, encode(coder, context, value));
- }
-}