You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:34 UTC
[10/67] [partial] incubator-beam git commit: Directory reorganization
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PackageUtil.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PackageUtil.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PackageUtil.java
deleted file mode 100644
index 8b2d56f..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PackageUtil.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.dataflow.model.DataflowPackage;
-import com.google.cloud.hadoop.util.ApiErrorExtractor;
-import com.google.common.hash.Funnels;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
-import com.google.common.io.CountingOutputStream;
-import com.google.common.io.Files;
-
-import com.fasterxml.jackson.core.Base64Variants;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Objects;
-
-/** Helper routines for packages. */
-public class PackageUtil {
- private static final Logger LOG = LoggerFactory.getLogger(PackageUtil.class);
- /**
- * A reasonable upper bound on the number of jars required to launch a Dataflow job.
- */
- public static final int SANE_CLASSPATH_SIZE = 1000;
- /**
- * The initial interval to use between package staging attempts.
- */
- private static final long INITIAL_BACKOFF_INTERVAL_MS = 5000L;
- /**
- * The maximum number of attempts when staging a file.
- */
- private static final int MAX_ATTEMPTS = 5;
-
- /**
- * Translates exceptions from API calls.
- */
- private static final ApiErrorExtractor ERROR_EXTRACTOR = new ApiErrorExtractor();
-
- /**
- * Creates a DataflowPackage containing information about how a classpath element should be
- * staged, including the staging destination as well as its size and hash.
- *
- * @param classpathElement The local path for the classpath element.
- * @param stagingPath The base location for staged classpath elements.
- * @param overridePackageName If non-null, use the given value as the package name
- * instead of generating one automatically.
- * @return The package.
- */
- @Deprecated
- public static DataflowPackage createPackage(File classpathElement,
- String stagingPath, String overridePackageName) {
- return createPackageAttributes(classpathElement, stagingPath, overridePackageName)
- .getDataflowPackage();
- }
-
- /**
- * Compute and cache the attributes of a classpath element that we will need to stage it.
- *
- * @param classpathElement the file or directory to be staged.
- * @param stagingPath The base location for staged classpath elements.
- * @param overridePackageName If non-null, use the given value as the package name
- * instead of generating one automatically.
- * @return a {@link PackageAttributes} that containing metadata about the object to be staged.
- */
- static PackageAttributes createPackageAttributes(File classpathElement,
- String stagingPath, String overridePackageName) {
- try {
- boolean directory = classpathElement.isDirectory();
-
- // Compute size and hash in one pass over file or directory.
- Hasher hasher = Hashing.md5().newHasher();
- OutputStream hashStream = Funnels.asOutputStream(hasher);
- CountingOutputStream countingOutputStream = new CountingOutputStream(hashStream);
-
- if (!directory) {
- // Files are staged as-is.
- Files.asByteSource(classpathElement).copyTo(countingOutputStream);
- } else {
- // Directories are recursively zipped.
- ZipFiles.zipDirectory(classpathElement, countingOutputStream);
- }
-
- long size = countingOutputStream.getCount();
- String hash = Base64Variants.MODIFIED_FOR_URL.encode(hasher.hash().asBytes());
-
- // Create the DataflowPackage with staging name and location.
- String uniqueName = getUniqueContentName(classpathElement, hash);
- String resourcePath = IOChannelUtils.resolve(stagingPath, uniqueName);
- DataflowPackage target = new DataflowPackage();
- target.setName(overridePackageName != null ? overridePackageName : uniqueName);
- target.setLocation(resourcePath);
-
- return new PackageAttributes(size, hash, directory, target);
- } catch (IOException e) {
- throw new RuntimeException("Package setup failure for " + classpathElement, e);
- }
- }
-
- /**
- * Transfers the classpath elements to the staging location.
- *
- * @param classpathElements The elements to stage.
- * @param stagingPath The base location to stage the elements to.
- * @return A list of cloud workflow packages, each representing a classpath element.
- */
- public static List<DataflowPackage> stageClasspathElements(
- Collection<String> classpathElements, String stagingPath) {
- return stageClasspathElements(classpathElements, stagingPath, Sleeper.DEFAULT);
- }
-
- // Visible for testing.
- static List<DataflowPackage> stageClasspathElements(
- Collection<String> classpathElements, String stagingPath,
- Sleeper retrySleeper) {
- LOG.info("Uploading {} files from PipelineOptions.filesToStage to staging location to "
- + "prepare for execution.", classpathElements.size());
-
- if (classpathElements.size() > SANE_CLASSPATH_SIZE) {
- LOG.warn("Your classpath contains {} elements, which Google Cloud Dataflow automatically "
- + "copies to all workers. Having this many entries on your classpath may be indicative "
- + "of an issue in your pipeline. You may want to consider trimming the classpath to "
- + "necessary dependencies only, using --filesToStage pipeline option to override "
- + "what files are being staged, or bundling several dependencies into one.",
- classpathElements.size());
- }
-
- ArrayList<DataflowPackage> packages = new ArrayList<>();
-
- if (stagingPath == null) {
- throw new IllegalArgumentException(
- "Can't stage classpath elements on because no staging location has been provided");
- }
-
- int numUploaded = 0;
- int numCached = 0;
- for (String classpathElement : classpathElements) {
- String packageName = null;
- if (classpathElement.contains("=")) {
- String[] components = classpathElement.split("=", 2);
- packageName = components[0];
- classpathElement = components[1];
- }
-
- File file = new File(classpathElement);
- if (!file.exists()) {
- LOG.warn("Skipping non-existent classpath element {} that was specified.",
- classpathElement);
- continue;
- }
-
- PackageAttributes attributes = createPackageAttributes(file, stagingPath, packageName);
-
- DataflowPackage workflowPackage = attributes.getDataflowPackage();
- packages.add(workflowPackage);
- String target = workflowPackage.getLocation();
-
- // TODO: Should we attempt to detect the Mime type rather than
- // always using MimeTypes.BINARY?
- try {
- try {
- long remoteLength = IOChannelUtils.getSizeBytes(target);
- if (remoteLength == attributes.getSize()) {
- LOG.debug("Skipping classpath element already staged: {} at {}",
- classpathElement, target);
- numCached++;
- continue;
- }
- } catch (FileNotFoundException expected) {
- // If the file doesn't exist, it means we need to upload it.
- }
-
- // Upload file, retrying on failure.
- AttemptBoundedExponentialBackOff backoff = new AttemptBoundedExponentialBackOff(
- MAX_ATTEMPTS,
- INITIAL_BACKOFF_INTERVAL_MS);
- while (true) {
- try {
- LOG.debug("Uploading classpath element {} to {}", classpathElement, target);
- try (WritableByteChannel writer = IOChannelUtils.create(target, MimeTypes.BINARY)) {
- copyContent(classpathElement, writer);
- }
- numUploaded++;
- break;
- } catch (IOException e) {
- if (ERROR_EXTRACTOR.accessDenied(e)) {
- String errorMessage = String.format(
- "Uploaded failed due to permissions error, will NOT retry staging "
- + "of classpath %s. Please verify credentials are valid and that you have "
- + "write access to %s. Stale credentials can be resolved by executing "
- + "'gcloud auth login'.", classpathElement, target);
- LOG.error(errorMessage);
- throw new IOException(errorMessage, e);
- } else if (!backoff.atMaxAttempts()) {
- LOG.warn("Upload attempt failed, sleeping before retrying staging of classpath: {}",
- classpathElement, e);
- BackOffUtils.next(retrySleeper, backoff);
- } else {
- // Rethrow last error, to be included as a cause in the catch below.
- LOG.error("Upload failed, will NOT retry staging of classpath: {}",
- classpathElement, e);
- throw e;
- }
- }
- }
- } catch (Exception e) {
- throw new RuntimeException("Could not stage classpath element: " + classpathElement, e);
- }
- }
-
- LOG.info("Uploading PipelineOptions.filesToStage complete: {} files newly uploaded, "
- + "{} files cached",
- numUploaded, numCached);
-
- return packages;
- }
-
- /**
- * Returns a unique name for a file with a given content hash.
- *
- * <p>Directory paths are removed. Example:
- * <pre>
- * dir="a/b/c/d", contentHash="f000" => d-f000.jar
- * file="a/b/c/d.txt", contentHash="f000" => d-f000.txt
- * file="a/b/c/d", contentHash="f000" => d-f000
- * </pre>
- */
- static String getUniqueContentName(File classpathElement, String contentHash) {
- String fileName = Files.getNameWithoutExtension(classpathElement.getAbsolutePath());
- String fileExtension = Files.getFileExtension(classpathElement.getAbsolutePath());
- if (classpathElement.isDirectory()) {
- return fileName + "-" + contentHash + ".jar";
- } else if (fileExtension.isEmpty()) {
- return fileName + "-" + contentHash;
- }
- return fileName + "-" + contentHash + "." + fileExtension;
- }
-
- /**
- * Copies the contents of the classpathElement to the output channel.
- *
- * <p>If the classpathElement is a directory, a Zip stream is constructed on the fly,
- * otherwise the file contents are copied as-is.
- *
- * <p>The output channel is not closed.
- */
- private static void copyContent(String classpathElement, WritableByteChannel outputChannel)
- throws IOException {
- final File classpathElementFile = new File(classpathElement);
- if (classpathElementFile.isDirectory()) {
- ZipFiles.zipDirectory(classpathElementFile, Channels.newOutputStream(outputChannel));
- } else {
- Files.asByteSource(classpathElementFile).copyTo(Channels.newOutputStream(outputChannel));
- }
- }
- /**
- * Holds the metadata necessary to stage a file or confirm that a staged file has not changed.
- */
- static class PackageAttributes {
- private final boolean directory;
- private final long size;
- private final String hash;
- private DataflowPackage dataflowPackage;
-
- public PackageAttributes(long size, String hash, boolean directory,
- DataflowPackage dataflowPackage) {
- this.size = size;
- this.hash = Objects.requireNonNull(hash, "hash");
- this.directory = directory;
- this.dataflowPackage = Objects.requireNonNull(dataflowPackage, "dataflowPackage");
- }
-
- /**
- * @return the dataflowPackage
- */
- public DataflowPackage getDataflowPackage() {
- return dataflowPackage;
- }
-
- /**
- * @return the directory
- */
- public boolean isDirectory() {
- return directory;
- }
-
- /**
- * @return the size
- */
- public long getSize() {
- return size;
- }
-
- /**
- * @return the hash
- */
- public String getHash() {
- return hash;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java
deleted file mode 100644
index a7818a3..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark;
-import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
-import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
-import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo.Timing;
-import com.google.cloud.dataflow.sdk.util.state.ReadableState;
-import com.google.cloud.dataflow.sdk.util.state.StateAccessor;
-import com.google.cloud.dataflow.sdk.util.state.StateTag;
-import com.google.cloud.dataflow.sdk.util.state.StateTags;
-import com.google.cloud.dataflow.sdk.util.state.ValueState;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-import org.joda.time.Instant;
-
-/**
- * Determine the timing and other properties of a new pane for a given computation, key and window.
- * Incorporates any previous pane, whether the pane has been produced because an
- * on-time {@link AfterWatermark} trigger firing, and the relation between the element's timestamp
- * and the current output watermark.
- */
-public class PaneInfoTracker {
- private TimerInternals timerInternals;
-
- public PaneInfoTracker(TimerInternals timerInternals) {
- this.timerInternals = timerInternals;
- }
-
- @VisibleForTesting
- static final StateTag<Object, ValueState<PaneInfo>> PANE_INFO_TAG =
- StateTags.makeSystemTagInternal(StateTags.value("pane", PaneInfoCoder.INSTANCE));
-
- public void clear(StateAccessor<?> state) {
- state.access(PANE_INFO_TAG).clear();
- }
-
- /**
- * Return a ({@link ReadableState} for) the pane info appropriate for {@code context}. The pane
- * info includes the timing for the pane, who's calculation is quite subtle.
- *
- * @param isFinal should be {@code true} only if the triggering machinery can guarantee
- * no further firings for the
- */
- public ReadableState<PaneInfo> getNextPaneInfo(
- ReduceFn<?, ?, ?, ?>.Context context, final boolean isFinal) {
- final Object key = context.key();
- final ReadableState<PaneInfo> previousPaneFuture =
- context.state().access(PaneInfoTracker.PANE_INFO_TAG);
- final Instant windowMaxTimestamp = context.window().maxTimestamp();
-
- return new ReadableState<PaneInfo>() {
- @Override
- public ReadableState<PaneInfo> readLater() {
- previousPaneFuture.readLater();
- return this;
- }
-
- @Override
- public PaneInfo read() {
- PaneInfo previousPane = previousPaneFuture.read();
- return describePane(key, windowMaxTimestamp, previousPane, isFinal);
- }
- };
- }
-
- public void storeCurrentPaneInfo(ReduceFn<?, ?, ?, ?>.Context context, PaneInfo currentPane) {
- context.state().access(PANE_INFO_TAG).write(currentPane);
- }
-
- private <W> PaneInfo describePane(
- Object key, Instant windowMaxTimestamp, PaneInfo previousPane, boolean isFinal) {
- boolean isFirst = previousPane == null;
- Timing previousTiming = isFirst ? null : previousPane.getTiming();
- long index = isFirst ? 0 : previousPane.getIndex() + 1;
- long nonSpeculativeIndex = isFirst ? 0 : previousPane.getNonSpeculativeIndex() + 1;
- Instant outputWM = timerInternals.currentOutputWatermarkTime();
- Instant inputWM = timerInternals.currentInputWatermarkTime();
-
- // True if it is not possible to assign the element representing this pane a timestamp
- // which will make an ON_TIME pane for any following computation.
- // Ie true if the element's latest possible timestamp is before the current output watermark.
- boolean isLateForOutput = outputWM != null && windowMaxTimestamp.isBefore(outputWM);
-
- // True if all emitted panes (if any) were EARLY panes.
- // Once the ON_TIME pane has fired, all following panes must be considered LATE even
- // if the output watermark is behind the end of the window.
- boolean onlyEarlyPanesSoFar = previousTiming == null || previousTiming == Timing.EARLY;
-
- // True is the input watermark hasn't passed the window's max timestamp.
- boolean isEarlyForInput = inputWM == null || !inputWM.isAfter(windowMaxTimestamp);
-
- Timing timing;
- if (isLateForOutput || !onlyEarlyPanesSoFar) {
- // The output watermark has already passed the end of this window, or we have already
- // emitted a non-EARLY pane. Irrespective of how this pane was triggered we must
- // consider this pane LATE.
- timing = Timing.LATE;
- } else if (isEarlyForInput) {
- // This is an EARLY firing.
- timing = Timing.EARLY;
- nonSpeculativeIndex = -1;
- } else {
- // This is the unique ON_TIME firing for the window.
- timing = Timing.ON_TIME;
- }
-
- WindowTracing.debug(
- "describePane: {} pane (prev was {}) for key:{}; windowMaxTimestamp:{}; "
- + "inputWatermark:{}; outputWatermark:{}; isLateForOutput:{}",
- timing, previousTiming, key, windowMaxTimestamp, inputWM, outputWM, isLateForOutput);
-
- if (previousPane != null) {
- // Timing transitions should follow EARLY* ON_TIME? LATE*
- switch (previousTiming) {
- case EARLY:
- Preconditions.checkState(
- timing == Timing.EARLY || timing == Timing.ON_TIME || timing == Timing.LATE,
- "EARLY cannot transition to %s", timing);
- break;
- case ON_TIME:
- Preconditions.checkState(
- timing == Timing.LATE, "ON_TIME cannot transition to %s", timing);
- break;
- case LATE:
- Preconditions.checkState(timing == Timing.LATE, "LATE cannot transtion to %s", timing);
- break;
- case UNKNOWN:
- break;
- }
- Preconditions.checkState(!previousPane.isLast(), "Last pane was not last after all.");
- }
-
- return PaneInfo.createPane(isFirst, isFinal, timing, index, nonSpeculativeIndex);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PathValidator.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PathValidator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PathValidator.java
deleted file mode 100644
index 658de2a..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PathValidator.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-/**
- * Interface for controlling validation of paths.
- */
-public interface PathValidator {
- /**
- * Validate that a file pattern is conforming.
- *
- * @param filepattern The file pattern to verify.
- * @return The post-validation filepattern.
- */
- public String validateInputFilePatternSupported(String filepattern);
-
- /**
- * Validate that an output file prefix is conforming.
- *
- * @param filePrefix the file prefix to verify.
- * @return The post-validation filePrefix.
- */
- public String validateOutputFilePrefixSupported(String filePrefix);
-
- /**
- * Validate that a path is a valid path and that the path
- * is accessible.
- *
- * @param path The path to verify.
- * @return The post-validation path.
- */
- public String verifyPath(String path);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PerKeyCombineFnRunner.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PerKeyCombineFnRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PerKeyCombineFnRunner.java
deleted file mode 100644
index b5f328f..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PerKeyCombineFnRunner.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.CombineFnBase.PerKeyCombineFn;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-
-import java.io.Serializable;
-import java.util.Collection;
-
-/**
- * An interface that runs a {@link PerKeyCombineFn} with unified APIs.
- *
- * <p>Different keyed combine functions have their own implementations.
- * For example, the implementation can skip allocating {@code Combine.Context},
- * if the keyed combine function doesn't use it.
- */
-public interface PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> extends Serializable {
- /**
- * Returns the {@link PerKeyCombineFn} it holds.
- *
- * <p>It can be a {@code KeyedCombineFn} or a {@code KeyedCombineFnWithContext}.
- */
- public PerKeyCombineFn<K, InputT, AccumT, OutputT> fn();
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * Forwards the call to a {@link PerKeyCombineFn} to create the accumulator in a {@link DoFn}.
- *
- * <p>It constructs a {@code CombineWithContext.Context} from {@code DoFn.ProcessContext}
- * if it is required.
- */
- public AccumT createAccumulator(K key, DoFn<?, ?>.ProcessContext c);
-
- /**
- * Forwards the call to a {@link PerKeyCombineFn} to add the input in a {@link DoFn}.
- *
- * <p>It constructs a {@code CombineWithContext.Context} from {@code DoFn.ProcessContext}
- * if it is required.
- */
- public AccumT addInput(K key, AccumT accumulator, InputT input, DoFn<?, ?>.ProcessContext c);
-
- /**
- * Forwards the call to a {@link PerKeyCombineFn} to merge accumulators in a {@link DoFn}.
- *
- * <p>It constructs a {@code CombineWithContext.Context} from {@code DoFn.ProcessContext}
- * if it is required.
- */
- public AccumT mergeAccumulators(
- K key, Iterable<AccumT> accumulators, DoFn<?, ?>.ProcessContext c);
-
- /**
- * Forwards the call to a {@link PerKeyCombineFn} to extract the output in a {@link DoFn}.
- *
- * <p>It constructs a {@code CombineWithContext.Context} from {@code DoFn.ProcessContext}
- * if it is required.
- */
- public OutputT extractOutput(K key, AccumT accumulator, DoFn<?, ?>.ProcessContext c);
-
- /**
- * Forwards the call to a {@link PerKeyCombineFn} to compact the accumulator in a {@link DoFn}.
- *
- * <p>It constructs a {@code CombineWithContext.Context} from {@code DoFn.ProcessContext}
- * if it is required.
- */
- public AccumT compact(K key, AccumT accumulator, DoFn<?, ?>.ProcessContext c);
-
- /**
- * Forwards the call to a {@link PerKeyCombineFn} to combine the inputs and extract output
- * in a {@link DoFn}.
- *
- * <p>It constructs a {@code CombineWithContext.Context} from {@code DoFn.ProcessContext}
- * if it is required.
- */
- public OutputT apply(K key, Iterable<? extends InputT> inputs, DoFn<?, ?>.ProcessContext c);
-
- /**
- * Forwards the call to a {@link PerKeyCombineFn} to add all inputs in a {@link DoFn}.
- *
- * <p>It constructs a {@code CombineWithContext.Context} from {@code DoFn.ProcessContext}
- * if it is required.
- */
- public AccumT addInputs(K key, Iterable<InputT> inputs, DoFn<?, ?>.ProcessContext c);
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * Forwards the call to a {@link PerKeyCombineFn} to create the accumulator.
- *
- * <p>It constructs a {@code CombineWithContext.Context} from
- * {@link PipelineOptions} and {@link SideInputReader} if it is required.
- */
- public AccumT createAccumulator(K key, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
-
- /**
- * Forwards the call to a {@link PerKeyCombineFn} to add the input.
- *
- * <p>It constructs a {@code CombineWithContext.Context} from
- * {@link PipelineOptions} and {@link SideInputReader} if it is required.
- */
- public AccumT addInput(K key, AccumT accumulator, InputT value, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
-
- /**
- * Forwards the call to a {@link PerKeyCombineFn} to merge accumulators.
- *
- * <p>It constructs a {@code CombineWithContext.Context} from
- * {@link PipelineOptions} and {@link SideInputReader} if it is required.
- */
- public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
-
- /**
- * Forwards the call to a {@link PerKeyCombineFn} to extract the output.
- *
- * <p>It constructs a {@code CombineWithContext.Context} from
- * {@link PipelineOptions} and {@link SideInputReader} if it is required.
- */
- public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
-
- /**
- * Forwards the call to a {@link PerKeyCombineFn} to compact the accumulator.
- *
- * <p>It constructs a {@code CombineWithContext.Context} from
- * {@link PipelineOptions} and {@link SideInputReader} if it is required.
- */
- public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PerKeyCombineFnRunners.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PerKeyCombineFnRunners.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PerKeyCombineFnRunners.java
deleted file mode 100644
index 6606c54..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PerKeyCombineFnRunners.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn;
-import com.google.cloud.dataflow.sdk.transforms.CombineFnBase.PerKeyCombineFn;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.RequiresContextInternal;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.common.collect.Iterables;
-
-import java.util.Collection;
-
-/**
- * Static utility methods that provide {@link PerKeyCombineFnRunner} implementations
- * for different keyed combine functions.
- */
-public class PerKeyCombineFnRunners {
- /**
- * Returns a {@link PerKeyCombineFnRunner} from a {@link PerKeyCombineFn}.
- */
- public static <K, InputT, AccumT, OutputT> PerKeyCombineFnRunner<K, InputT, AccumT, OutputT>
- create(PerKeyCombineFn<K, InputT, AccumT, OutputT> perKeyCombineFn) {
- if (perKeyCombineFn instanceof RequiresContextInternal) {
- return new KeyedCombineFnWithContextRunner<>(
- (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) perKeyCombineFn);
- } else {
- return new KeyedCombineFnRunner<>(
- (KeyedCombineFn<K, InputT, AccumT, OutputT>) perKeyCombineFn);
- }
- }
-
- /**
- * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFn}.
- *
- * It forwards functions calls to the {@link KeyedCombineFn}.
- */
- private static class KeyedCombineFnRunner<K, InputT, AccumT, OutputT>
- implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
- private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn;
-
- private KeyedCombineFnRunner(
- KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) {
- this.keyedCombineFn = keyedCombineFn;
- }
-
- @Override
- public KeyedCombineFn<K, InputT, AccumT, OutputT> fn() {
- return keyedCombineFn;
- }
-
- @Override
- public AccumT createAccumulator(K key, DoFn<?, ?>.ProcessContext c) {
- return keyedCombineFn.createAccumulator(key);
- }
-
- @Override
- public AccumT addInput(
- K key, AccumT accumulator, InputT input, DoFn<?, ?>.ProcessContext c) {
- return keyedCombineFn.addInput(key, accumulator, input);
- }
-
- @Override
- public AccumT mergeAccumulators(
- K key, Iterable<AccumT> accumulators, DoFn<?, ?>.ProcessContext c) {
- return keyedCombineFn.mergeAccumulators(key, accumulators);
- }
-
- @Override
- public OutputT extractOutput(K key, AccumT accumulator, DoFn<?, ?>.ProcessContext c) {
- return keyedCombineFn.extractOutput(key, accumulator);
- }
-
- @Override
- public AccumT compact(K key, AccumT accumulator, DoFn<?, ?>.ProcessContext c) {
- return keyedCombineFn.compact(key, accumulator);
- }
-
- @Override
- public OutputT apply(K key, Iterable<? extends InputT> inputs, DoFn<?, ?>.ProcessContext c) {
- return keyedCombineFn.apply(key, inputs);
- }
-
- @Override
- public AccumT addInputs(K key, Iterable<InputT> inputs, DoFn<?, ?>.ProcessContext c) {
- AccumT accum = keyedCombineFn.createAccumulator(key);
- for (InputT input : inputs) {
- accum = keyedCombineFn.addInput(key, accum, input);
- }
- return accum;
- }
-
- @Override
- public String toString() {
- return keyedCombineFn.toString();
- }
-
- @Override
- public AccumT createAccumulator(K key, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFn.createAccumulator(key);
- }
-
- @Override
- public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFn.addInput(key, accumulator, input);
- }
-
- @Override
- public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFn.mergeAccumulators(key, accumulators);
- }
-
- @Override
- public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFn.extractOutput(key, accumulator);
- }
-
- @Override
- public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFn.compact(key, accumulator);
- }
- }
-
- /**
- * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFnWithContext}.
- *
- * It forwards functions calls to the {@link KeyedCombineFnWithContext}.
- */
- private static class KeyedCombineFnWithContextRunner<K, InputT, AccumT, OutputT>
- implements PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
- private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext;
-
- private KeyedCombineFnWithContextRunner(
- KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext) {
- this.keyedCombineFnWithContext = keyedCombineFnWithContext;
- }
-
- @Override
- public KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> fn() {
- return keyedCombineFnWithContext;
- }
-
- @Override
- public AccumT createAccumulator(K key, DoFn<?, ?>.ProcessContext c) {
- return keyedCombineFnWithContext.createAccumulator(key,
- CombineContextFactory.createFromProcessContext(c));
- }
-
- @Override
- public AccumT addInput(
- K key, AccumT accumulator, InputT value, DoFn<?, ?>.ProcessContext c) {
- return keyedCombineFnWithContext.addInput(key, accumulator, value,
- CombineContextFactory.createFromProcessContext(c));
- }
-
- @Override
- public AccumT mergeAccumulators(
- K key, Iterable<AccumT> accumulators, DoFn<?, ?>.ProcessContext c) {
- return keyedCombineFnWithContext.mergeAccumulators(
- key, accumulators, CombineContextFactory.createFromProcessContext(c));
- }
-
- @Override
- public OutputT extractOutput(K key, AccumT accumulator, DoFn<?, ?>.ProcessContext c) {
- return keyedCombineFnWithContext.extractOutput(key, accumulator,
- CombineContextFactory.createFromProcessContext(c));
- }
-
- @Override
- public AccumT compact(K key, AccumT accumulator, DoFn<?, ?>.ProcessContext c) {
- return keyedCombineFnWithContext.compact(key, accumulator,
- CombineContextFactory.createFromProcessContext(c));
- }
-
- @Override
- public OutputT apply(K key, Iterable<? extends InputT> inputs, DoFn<?, ?>.ProcessContext c) {
- return keyedCombineFnWithContext.apply(key, inputs,
- CombineContextFactory.createFromProcessContext(c));
- }
-
- @Override
- public AccumT addInputs(K key, Iterable<InputT> inputs, DoFn<?, ?>.ProcessContext c) {
- CombineWithContext.Context combineContext = CombineContextFactory.createFromProcessContext(c);
- AccumT accum = keyedCombineFnWithContext.createAccumulator(key, combineContext);
- for (InputT input : inputs) {
- accum = keyedCombineFnWithContext.addInput(key, accum, input, combineContext);
- }
- return accum;
- }
-
- @Override
- public String toString() {
- return keyedCombineFnWithContext.toString();
- }
-
- @Override
- public AccumT createAccumulator(K key, PipelineOptions options, SideInputReader sideInputReader,
- Collection<? extends BoundedWindow> windows) {
- return keyedCombineFnWithContext.createAccumulator(key,
- CombineContextFactory.createFromComponents(
- options, sideInputReader, Iterables.getOnlyElement(windows)));
- }
-
- @Override
- public AccumT addInput(K key, AccumT accumulator, InputT input, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFnWithContext.addInput(key, accumulator, input,
- CombineContextFactory.createFromComponents(
- options, sideInputReader, Iterables.getOnlyElement(windows)));
- }
-
- @Override
- public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFnWithContext.mergeAccumulators(key, accumulators,
- CombineContextFactory.createFromComponents(
- options, sideInputReader, Iterables.getOnlyElement(windows)));
- }
-
- @Override
- public OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFnWithContext.extractOutput(key, accumulator,
- CombineContextFactory.createFromComponents(
- options, sideInputReader, Iterables.getOnlyElement(windows)));
- }
-
- @Override
- public AccumT compact(K key, AccumT accumulator, PipelineOptions options,
- SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) {
- return keyedCombineFnWithContext.compact(key, accumulator,
- CombineContextFactory.createFromComponents(
- options, sideInputReader, Iterables.getOnlyElement(windows)));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java
deleted file mode 100644
index 81572ea..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-/**
- * Constant property names used by the SDK in CloudWorkflow specifications.
- */
-public class PropertyNames {
- public static final String ALLOWED_ENCODINGS = "allowed_encodings";
- public static final String APPEND_TRAILING_NEWLINES = "append_trailing_newlines";
- public static final String BIGQUERY_CREATE_DISPOSITION = "create_disposition";
- public static final String BIGQUERY_DATASET = "dataset";
- public static final String BIGQUERY_PROJECT = "project";
- public static final String BIGQUERY_SCHEMA = "schema";
- public static final String BIGQUERY_TABLE = "table";
- public static final String BIGQUERY_QUERY = "bigquery_query";
- public static final String BIGQUERY_FLATTEN_RESULTS = "bigquery_flatten_results";
- public static final String BIGQUERY_WRITE_DISPOSITION = "write_disposition";
- public static final String BIGQUERY_EXPORT_FORMAT = "bigquery_export_format";
- public static final String BIGQUERY_EXPORT_SCHEMA = "bigquery_export_schema";
- public static final String CO_GBK_RESULT_SCHEMA = "co_gbk_result_schema";
- public static final String COMBINE_FN = "combine_fn";
- public static final String COMPONENT_ENCODINGS = "component_encodings";
- public static final String COMPRESSION_TYPE = "compression_type";
- public static final String CUSTOM_SOURCE_FORMAT = "custom_source";
- public static final String CONCAT_SOURCE_SOURCES = "sources";
- public static final String CONCAT_SOURCE_BASE_SPECS = "base_specs";
- public static final String SOURCE_STEP_INPUT = "custom_source_step_input";
- public static final String SOURCE_SPEC = "spec";
- public static final String SOURCE_METADATA = "metadata";
- public static final String SOURCE_DOES_NOT_NEED_SPLITTING = "does_not_need_splitting";
- public static final String SOURCE_PRODUCES_SORTED_KEYS = "produces_sorted_keys";
- public static final String SOURCE_IS_INFINITE = "is_infinite";
- public static final String SOURCE_ESTIMATED_SIZE_BYTES = "estimated_size_bytes";
- public static final String ELEMENT = "element";
- public static final String ELEMENTS = "elements";
- public static final String ENCODING = "encoding";
- public static final String ENCODING_ID = "encoding_id";
- public static final String END_INDEX = "end_index";
- public static final String END_OFFSET = "end_offset";
- public static final String END_SHUFFLE_POSITION = "end_shuffle_position";
- public static final String ENVIRONMENT_VERSION_JOB_TYPE_KEY = "job_type";
- public static final String ENVIRONMENT_VERSION_MAJOR_KEY = "major";
- public static final String FILENAME = "filename";
- public static final String FILENAME_PREFIX = "filename_prefix";
- public static final String FILENAME_SUFFIX = "filename_suffix";
- public static final String FILEPATTERN = "filepattern";
- public static final String FOOTER = "footer";
- public static final String FORMAT = "format";
- public static final String HEADER = "header";
- public static final String INPUTS = "inputs";
- public static final String INPUT_CODER = "input_coder";
- public static final String IS_GENERATED = "is_generated";
- public static final String IS_MERGING_WINDOW_FN = "is_merging_window_fn";
- public static final String IS_PAIR_LIKE = "is_pair_like";
- public static final String IS_STREAM_LIKE = "is_stream_like";
- public static final String IS_WRAPPER = "is_wrapper";
- public static final String DISALLOW_COMBINER_LIFTING = "disallow_combiner_lifting";
- public static final String NON_PARALLEL_INPUTS = "non_parallel_inputs";
- public static final String NUM_SHARD_CODERS = "num_shard_coders";
- public static final String NUM_METADATA_SHARD_CODERS = "num_metadata_shard_coders";
- public static final String NUM_SHARDS = "num_shards";
- public static final String OBJECT_TYPE_NAME = "@type";
- public static final String OUTPUT = "output";
- public static final String OUTPUT_INFO = "output_info";
- public static final String OUTPUT_NAME = "output_name";
- public static final String PARALLEL_INPUT = "parallel_input";
- public static final String PHASE = "phase";
- public static final String PUBSUB_ID_LABEL = "pubsub_id_label";
- public static final String PUBSUB_SUBSCRIPTION = "pubsub_subscription";
- public static final String PUBSUB_TIMESTAMP_LABEL = "pubsub_timestamp_label";
- public static final String PUBSUB_TOPIC = "pubsub_topic";
- public static final String SCALAR_FIELD_NAME = "value";
- public static final String SERIALIZED_FN = "serialized_fn";
- public static final String SHARD_NAME_TEMPLATE = "shard_template";
- public static final String SHUFFLE_KIND = "shuffle_kind";
- public static final String SHUFFLE_READER_CONFIG = "shuffle_reader_config";
- public static final String SHUFFLE_WRITER_CONFIG = "shuffle_writer_config";
- public static final String SORT_VALUES = "sort_values";
- public static final String START_INDEX = "start_index";
- public static final String START_OFFSET = "start_offset";
- public static final String START_SHUFFLE_POSITION = "start_shuffle_position";
- public static final String STRIP_TRAILING_NEWLINES = "strip_trailing_newlines";
- public static final String TUPLE_TAGS = "tuple_tags";
- public static final String USE_INDEXED_FORMAT = "use_indexed_format";
- public static final String USER_FN = "user_fn";
- public static final String USER_NAME = "user_name";
- public static final String USES_KEYED_STATE = "uses_keyed_state";
- public static final String VALIDATE_SINK = "validate_sink";
- public static final String VALIDATE_SOURCE = "validate_source";
- public static final String VALUE = "value";
- public static final String DISPLAY_DATA = "display_data";
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/RandomAccessData.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/RandomAccessData.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/RandomAccessData.java
deleted file mode 100644
index 6c96c8e..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/RandomAccessData.java
+++ /dev/null
@@ -1,352 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
-import com.google.cloud.dataflow.sdk.coders.ByteArrayCoder;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.common.base.MoreObjects;
-import com.google.common.io.ByteStreams;
-import com.google.common.primitives.UnsignedBytes;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.Comparator;
-
-import javax.annotation.concurrent.NotThreadSafe;
-
-/**
- * An elastic-sized byte array which allows you to manipulate it as a stream, or access
- * it directly. This allows for a quick succession of moving bytes from an {@link InputStream}
- * to this wrapper to be used as an {@link OutputStream} and vice versa. This wrapper
- * also provides random access to bytes stored within. This wrapper allows users to finely
- * control the number of byte copies that occur.
- *
- * Anything stored within the in-memory buffer from offset {@link #size()} is considered temporary
- * unused storage.
- */
-@NotThreadSafe
-public class RandomAccessData {
- /**
- * A {@link Coder} which encodes the valid parts of this stream.
- * This follows the same encoding scheme as {@link ByteArrayCoder}.
- * This coder is deterministic and consistent with equals.
- *
- * This coder does not support encoding positive infinity.
- */
- public static class RandomAccessDataCoder extends AtomicCoder<RandomAccessData> {
- private static final RandomAccessDataCoder INSTANCE = new RandomAccessDataCoder();
-
- @JsonCreator
- public static RandomAccessDataCoder of() {
- return INSTANCE;
- }
-
- @Override
- public void encode(RandomAccessData value, OutputStream outStream, Coder.Context context)
- throws CoderException, IOException {
- if (value == POSITIVE_INFINITY) {
- throw new CoderException("Positive infinity can not be encoded.");
- }
- if (!context.isWholeStream) {
- VarInt.encode(value.size, outStream);
- }
- value.writeTo(outStream, 0, value.size);
- }
-
- @Override
- public RandomAccessData decode(InputStream inStream, Coder.Context context)
- throws CoderException, IOException {
- RandomAccessData rval = new RandomAccessData();
- if (!context.isWholeStream) {
- int length = VarInt.decodeInt(inStream);
- rval.readFrom(inStream, 0, length);
- } else {
- ByteStreams.copy(inStream, rval.asOutputStream());
- }
- return rval;
- }
-
- @Override
- public boolean consistentWithEquals() {
- return true;
- }
-
- @Override
- public boolean isRegisterByteSizeObserverCheap(
- RandomAccessData value, Coder.Context context) {
- return true;
- }
-
- @Override
- protected long getEncodedElementByteSize(RandomAccessData value, Coder.Context context)
- throws Exception {
- if (value == null) {
- throw new CoderException("cannot encode a null in memory stream");
- }
- long size = 0;
- if (!context.isWholeStream) {
- size += VarInt.getLength(value.size);
- }
- return size + value.size;
- }
- }
-
- public static final UnsignedLexicographicalComparator UNSIGNED_LEXICOGRAPHICAL_COMPARATOR =
- new UnsignedLexicographicalComparator();
-
- /**
- * A {@link Comparator} that compares two byte arrays lexicographically. It compares
- * values as a list of unsigned bytes. The first pair of values that follow any common prefix,
- * or when one array is a prefix of the other, treats the shorter array as the lesser.
- * For example, [] < [0x01] < [0x01, 0x7F] < [0x01, 0x80] < [0x02] < POSITIVE INFINITY.
- *
- * <p>Note that a token type of positive infinity is supported and is greater than
- * all other {@link RandomAccessData}.
- */
- public static final class UnsignedLexicographicalComparator
- implements Comparator<RandomAccessData> {
- // Do not instantiate
- private UnsignedLexicographicalComparator() {
- }
-
- @Override
- public int compare(RandomAccessData o1, RandomAccessData o2) {
- return compare(o1, o2, 0 /* start from the beginning */);
- }
-
- /**
- * Compare the two sets of bytes starting at the given offset.
- */
- public int compare(RandomAccessData o1, RandomAccessData o2, int startOffset) {
- if (o1 == o2) {
- return 0;
- }
- if (o1 == POSITIVE_INFINITY) {
- return 1;
- }
- if (o2 == POSITIVE_INFINITY) {
- return -1;
- }
-
- int minBytesLen = Math.min(o1.size, o2.size);
- for (int i = startOffset; i < minBytesLen; i++) {
- // unsigned comparison
- int b1 = o1.buffer[i] & 0xFF;
- int b2 = o2.buffer[i] & 0xFF;
- if (b1 == b2) {
- continue;
- }
- // Return the stream with the smaller byte as the smaller value.
- return b1 - b2;
- }
- // If one is a prefix of the other, return the shorter one as the smaller one.
- // If both lengths are equal, then both streams are equal.
- return o1.size - o2.size;
- }
-
- /**
- * Compute the length of the common prefix of the two provided sets of bytes.
- */
- public int commonPrefixLength(RandomAccessData o1, RandomAccessData o2) {
- int minBytesLen = Math.min(o1.size, o2.size);
- for (int i = 0; i < minBytesLen; i++) {
- // unsigned comparison
- int b1 = o1.buffer[i] & 0xFF;
- int b2 = o2.buffer[i] & 0xFF;
- if (b1 != b2) {
- return i;
- }
- }
- return minBytesLen;
- }
- }
-
- /** A token type representing positive infinity. */
- static final RandomAccessData POSITIVE_INFINITY = new RandomAccessData(0);
-
- /**
- * Returns a RandomAccessData that is the smallest value of same length which
- * is strictly greater than this. Note that if this is empty or is all 0xFF then
- * a token value of positive infinity is returned.
- *
- * The {@link UnsignedLexicographicalComparator} supports comparing {@link RandomAccessData}
- * with support for positive infinitiy.
- */
- public RandomAccessData increment() throws IOException {
- RandomAccessData copy = copy();
- for (int i = copy.size - 1; i >= 0; --i) {
- if (copy.buffer[i] != UnsignedBytes.MAX_VALUE) {
- copy.buffer[i] = UnsignedBytes.checkedCast(UnsignedBytes.toInt(copy.buffer[i]) + 1);
- return copy;
- }
- }
- return POSITIVE_INFINITY;
- }
-
- private static final int DEFAULT_INITIAL_BUFFER_SIZE = 128;
-
- /** Constructs a RandomAccessData with a default buffer size. */
- public RandomAccessData() {
- this(DEFAULT_INITIAL_BUFFER_SIZE);
- }
-
- /** Constructs a RandomAccessData with the initial buffer. */
- public RandomAccessData(byte[] initialBuffer) {
- checkNotNull(initialBuffer);
- this.buffer = initialBuffer;
- this.size = initialBuffer.length;
- }
-
- /** Constructs a RandomAccessData with the given buffer size. */
- public RandomAccessData(int initialBufferSize) {
- checkArgument(initialBufferSize >= 0, "Expected initial buffer size to be greater than zero.");
- this.buffer = new byte[initialBufferSize];
- }
-
- private byte[] buffer;
- private int size;
-
- /** Returns the backing array. */
- public byte[] array() {
- return buffer;
- }
-
- /** Returns the number of bytes in the backing array that are valid. */
- public int size() {
- return size;
- }
-
- /** Resets the end of the stream to the specified position. */
- public void resetTo(int position) {
- ensureCapacity(position);
- size = position;
- }
-
- private final OutputStream outputStream = new OutputStream() {
- @Override
- public void write(int b) throws IOException {
- ensureCapacity(size + 1);
- buffer[size] = (byte) b;
- size += 1;
- }
-
- @Override
- public void write(byte[] b, int offset, int length) throws IOException {
- ensureCapacity(size + length);
- System.arraycopy(b, offset, buffer, size, length);
- size += length;
- }
- };
-
- /**
- * Returns an output stream which writes to the backing buffer from the current position.
- * Note that the internal buffer will grow as required to accomodate all data written.
- */
- public OutputStream asOutputStream() {
- return outputStream;
- }
-
- /**
- * Returns an {@link InputStream} wrapper which supplies the portion of this backing byte buffer
- * starting at {@code offset} and up to {@code length} bytes. Note that the returned
- * {@link InputStream} is only a wrapper and any modifications to the underlying
- * {@link RandomAccessData} will be visible by the {@link InputStream}.
- */
- public InputStream asInputStream(final int offset, final int length) {
- return new ByteArrayInputStream(buffer, offset, length);
- }
-
- /**
- * Writes {@code length} bytes starting at {@code offset} from the backing data store to the
- * specified output stream.
- */
- public void writeTo(OutputStream out, int offset, int length) throws IOException {
- out.write(buffer, offset, length);
- }
-
- /**
- * Reads {@code length} bytes from the specified input stream writing them into the backing
- * data store starting at {@code offset}.
- *
- * <p>Note that the in memory stream will be grown to ensure there is enough capacity.
- */
- public void readFrom(InputStream inStream, int offset, int length) throws IOException {
- ensureCapacity(offset + length);
- ByteStreams.readFully(inStream, buffer, offset, length);
- size = offset + length;
- }
-
- /** Returns a copy of this RandomAccessData. */
- public RandomAccessData copy() throws IOException {
- RandomAccessData copy = new RandomAccessData(size);
- writeTo(copy.asOutputStream(), 0, size);
- return copy;
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == this) {
- return true;
- }
- if (!(other instanceof RandomAccessData)) {
- return false;
- }
- return UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(this, (RandomAccessData) other) == 0;
- }
-
- @Override
- public int hashCode() {
- int result = 1;
- for (int i = 0; i < size; ++i) {
- result = 31 * result + buffer[i];
- }
-
- return result;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("buffer", Arrays.copyOf(buffer, size))
- .add("size", size)
- .toString();
- }
-
- private void ensureCapacity(int minCapacity) {
- // If we have enough space, don't grow the buffer.
- if (minCapacity <= buffer.length) {
- return;
- }
-
- // Try to double the size of the buffer, if thats not enough, just use the new capacity.
- // Note that we use Math.min(long, long) to not cause overflow on the multiplication.
- int newCapacity = (int) Math.min(Integer.MAX_VALUE, buffer.length * 2L);
- if (newCapacity < minCapacity) {
- newCapacity = minCapacity;
- }
- buffer = Arrays.copyOf(buffer, newCapacity);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFn.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFn.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFn.java
deleted file mode 100644
index c5ef2ea..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFn.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
-import com.google.cloud.dataflow.sdk.util.state.MergingStateAccessor;
-import com.google.cloud.dataflow.sdk.util.state.ReadableState;
-import com.google.cloud.dataflow.sdk.util.state.StateAccessor;
-
-import org.joda.time.Instant;
-
-import java.io.Serializable;
-
-/**
- * Specification for processing to happen after elements have been grouped by key.
- *
- * @param <K> The type of key being processed.
- * @param <InputT> The type of input values associated with the key.
- * @param <OutputT> The output type that will be produced for each key.
- * @param <W> The type of windows this operates on.
- */
-public abstract class ReduceFn<K, InputT, OutputT, W extends BoundedWindow>
- implements Serializable {
-
- /** Information accessible to all the processing methods in this {@code ReduceFn}. */
- public abstract class Context {
- /** Return the key that is being processed. */
- public abstract K key();
-
- /** The window that is being processed. */
- public abstract W window();
-
- /** Access the current {@link WindowingStrategy}. */
- public abstract WindowingStrategy<?, W> windowingStrategy();
-
- /** Return the interface for accessing state. */
- public abstract StateAccessor<K> state();
-
- /** Return the interface for accessing timers. */
- public abstract Timers timers();
- }
-
- /** Information accessible within {@link #processValue}. */
- public abstract class ProcessValueContext extends Context {
- /** Return the actual value being processed. */
- public abstract InputT value();
-
- /** Return the timestamp associated with the value. */
- public abstract Instant timestamp();
- }
-
- /** Information accessible within {@link #onMerge}. */
- public abstract class OnMergeContext extends Context {
- /** Return the interface for accessing state. */
- @Override
- public abstract MergingStateAccessor<K, W> state();
- }
-
- /** Information accessible within {@link #onTrigger}. */
- public abstract class OnTriggerContext extends Context {
- /** Returns the {@link PaneInfo} for the trigger firing being processed. */
- public abstract PaneInfo paneInfo();
-
- /** Output the given value in the current window. */
- public abstract void output(OutputT value);
- }
-
- //////////////////////////////////////////////////////////////////////////////////////////////////
-
- /**
- * Called for each value of type {@code InputT} associated with the current key.
- */
- public abstract void processValue(ProcessValueContext c) throws Exception;
-
- /**
- * Called when windows are merged.
- */
- public abstract void onMerge(OnMergeContext context) throws Exception;
-
- /**
- * Called when triggers fire.
- *
- * <p>Implementations of {@link ReduceFn} should call {@link OnTriggerContext#output} to emit
- * any results that should be included in the pane produced by this trigger firing.
- */
- public abstract void onTrigger(OnTriggerContext context) throws Exception;
-
- /**
- * Called before {@link #onMerge} is invoked to provide an opportunity to prefetch any needed
- * state.
- *
- * @param c Context to use prefetch from.
- */
- public void prefetchOnMerge(MergingStateAccessor<K, W> c) throws Exception {}
-
- /**
- * Called before {@link #onTrigger} is invoked to provide an opportunity to prefetch any needed
- * state.
- *
- * @param context Context to use prefetch from.
- */
- public void prefetchOnTrigger(StateAccessor<K> context) {}
-
- /**
- * Called to clear any persisted state that the {@link ReduceFn} may be holding. This will be
- * called when the windowing is closing and will receive no future interactions.
- */
- public abstract void clearState(Context context) throws Exception;
-
- /**
- * Returns true if the there is no buffered state.
- */
- public abstract ReadableState<Boolean> isEmpty(StateAccessor<K> context);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java
deleted file mode 100644
index bdbaf10..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java
+++ /dev/null
@@ -1,495 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
-import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
-import com.google.cloud.dataflow.sdk.util.state.MergingStateAccessor;
-import com.google.cloud.dataflow.sdk.util.state.ReadableState;
-import com.google.cloud.dataflow.sdk.util.state.State;
-import com.google.cloud.dataflow.sdk.util.state.StateAccessor;
-import com.google.cloud.dataflow.sdk.util.state.StateContext;
-import com.google.cloud.dataflow.sdk.util.state.StateContexts;
-import com.google.cloud.dataflow.sdk.util.state.StateInternals;
-import com.google.cloud.dataflow.sdk.util.state.StateNamespace;
-import com.google.cloud.dataflow.sdk.util.state.StateNamespaces;
-import com.google.cloud.dataflow.sdk.util.state.StateNamespaces.WindowNamespace;
-import com.google.cloud.dataflow.sdk.util.state.StateTag;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-
-import org.joda.time.Instant;
-
-import java.util.Collection;
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-/**
- * Factory for creating instances of the various {@link ReduceFn} contexts.
- */
-class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
- public interface OnTriggerCallbacks<OutputT> {
- void output(OutputT toOutput);
- }
-
- private final K key;
- private final ReduceFn<K, InputT, OutputT, W> reduceFn;
- private final WindowingStrategy<?, W> windowingStrategy;
- private final StateInternals<K> stateInternals;
- private final ActiveWindowSet<W> activeWindows;
- private final TimerInternals timerInternals;
- private final WindowingInternals<?, ?> windowingInternals;
- private final PipelineOptions options;
-
- ReduceFnContextFactory(K key, ReduceFn<K, InputT, OutputT, W> reduceFn,
- WindowingStrategy<?, W> windowingStrategy, StateInternals<K> stateInternals,
- ActiveWindowSet<W> activeWindows, TimerInternals timerInternals,
- WindowingInternals<?, ?> windowingInternals, PipelineOptions options) {
- this.key = key;
- this.reduceFn = reduceFn;
- this.windowingStrategy = windowingStrategy;
- this.stateInternals = stateInternals;
- this.activeWindows = activeWindows;
- this.timerInternals = timerInternals;
- this.windowingInternals = windowingInternals;
- this.options = options;
- }
-
- /** Where should we look for state associated with a given window? */
- public static enum StateStyle {
- /** All state is associated with the window itself. */
- DIRECT,
- /** State is associated with the 'state address' windows tracked by the active window set. */
- RENAMED
- }
-
- private StateAccessorImpl<K, W> stateAccessor(W window, StateStyle style) {
- return new StateAccessorImpl<K, W>(
- activeWindows, windowingStrategy.getWindowFn().windowCoder(),
- stateInternals, StateContexts.createFromComponents(options, windowingInternals, window),
- style);
- }
-
- public ReduceFn<K, InputT, OutputT, W>.Context base(W window, StateStyle style) {
- return new ContextImpl(stateAccessor(window, style));
- }
-
- public ReduceFn<K, InputT, OutputT, W>.ProcessValueContext forValue(
- W window, InputT value, Instant timestamp, StateStyle style) {
- return new ProcessValueContextImpl(stateAccessor(window, style), value, timestamp);
- }
-
- public ReduceFn<K, InputT, OutputT, W>.OnTriggerContext forTrigger(W window,
- ReadableState<PaneInfo> pane, StateStyle style, OnTriggerCallbacks<OutputT> callbacks) {
- return new OnTriggerContextImpl(stateAccessor(window, style), pane, callbacks);
- }
-
- public ReduceFn<K, InputT, OutputT, W>.OnMergeContext forMerge(
- Collection<W> activeToBeMerged, W mergeResult, StateStyle style) {
- return new OnMergeContextImpl(
- new MergingStateAccessorImpl<K, W>(activeWindows,
- windowingStrategy.getWindowFn().windowCoder(),
- stateInternals, style, activeToBeMerged, mergeResult));
- }
-
- public ReduceFn<K, InputT, OutputT, W>.OnMergeContext forPremerge(W window) {
- return new OnPremergeContextImpl(new PremergingStateAccessorImpl<K, W>(
- activeWindows, windowingStrategy.getWindowFn().windowCoder(), stateInternals, window));
- }
-
- private class TimersImpl implements Timers {
- private final StateNamespace namespace;
-
- public TimersImpl(StateNamespace namespace) {
- Preconditions.checkArgument(namespace instanceof WindowNamespace);
- this.namespace = namespace;
- }
-
- @Override
- public void setTimer(Instant timestamp, TimeDomain timeDomain) {
- timerInternals.setTimer(TimerData.of(namespace, timestamp, timeDomain));
- }
-
- @Override
- public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
- timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timeDomain));
- }
-
- @Override
- public Instant currentProcessingTime() {
- return timerInternals.currentProcessingTime();
- }
-
- @Override
- @Nullable
- public Instant currentSynchronizedProcessingTime() {
- return timerInternals.currentSynchronizedProcessingTime();
- }
-
- @Override
- @Nullable
- public Instant currentEventTime() {
- return timerInternals.currentInputWatermarkTime();
- }
- }
-
- // ======================================================================
- // StateAccessors
- // ======================================================================
- static class StateAccessorImpl<K, W extends BoundedWindow> implements StateAccessor<K> {
-
-
- protected final ActiveWindowSet<W> activeWindows;
- protected final StateContext<W> context;
- protected final StateNamespace windowNamespace;
- protected final Coder<W> windowCoder;
- protected final StateInternals<K> stateInternals;
- protected final StateStyle style;
-
- public StateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> windowCoder,
- StateInternals<K> stateInternals, StateContext<W> context, StateStyle style) {
-
- this.activeWindows = activeWindows;
- this.windowCoder = windowCoder;
- this.stateInternals = stateInternals;
- this.context = checkNotNull(context);
- this.windowNamespace = namespaceFor(context.window());
- this.style = style;
- }
-
- protected StateNamespace namespaceFor(W window) {
- return StateNamespaces.window(windowCoder, window);
- }
-
- protected StateNamespace windowNamespace() {
- return windowNamespace;
- }
-
- W window() {
- return context.window();
- }
-
- StateNamespace namespace() {
- return windowNamespace();
- }
-
- @Override
- public <StateT extends State> StateT access(StateTag<? super K, StateT> address) {
- switch (style) {
- case DIRECT:
- return stateInternals.state(windowNamespace(), address, context);
- case RENAMED:
- return stateInternals.state(
- namespaceFor(activeWindows.writeStateAddress(context.window())), address, context);
- }
- throw new RuntimeException(); // cases are exhaustive.
- }
- }
-
- static class MergingStateAccessorImpl<K, W extends BoundedWindow>
- extends StateAccessorImpl<K, W> implements MergingStateAccessor<K, W> {
- private final Collection<W> activeToBeMerged;
-
- public MergingStateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> windowCoder,
- StateInternals<K> stateInternals, StateStyle style, Collection<W> activeToBeMerged,
- W mergeResult) {
- super(activeWindows, windowCoder, stateInternals,
- StateContexts.windowOnly(mergeResult), style);
- this.activeToBeMerged = activeToBeMerged;
- }
-
- @Override
- public <StateT extends State> StateT access(StateTag<? super K, StateT> address) {
- switch (style) {
- case DIRECT:
- return stateInternals.state(windowNamespace(), address, context);
- case RENAMED:
- return stateInternals.state(
- namespaceFor(activeWindows.mergedWriteStateAddress(
- activeToBeMerged, context.window())),
- address,
- context);
- }
- throw new RuntimeException(); // cases are exhaustive.
- }
-
- @Override
- public <StateT extends State> Map<W, StateT> accessInEachMergingWindow(
- StateTag<? super K, StateT> address) {
- ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder();
- for (W mergingWindow : activeToBeMerged) {
- StateNamespace namespace = null;
- switch (style) {
- case DIRECT:
- namespace = namespaceFor(mergingWindow);
- break;
- case RENAMED:
- namespace = namespaceFor(activeWindows.writeStateAddress(mergingWindow));
- break;
- }
- Preconditions.checkNotNull(namespace); // cases are exhaustive.
- builder.put(mergingWindow, stateInternals.state(namespace, address, context));
- }
- return builder.build();
- }
- }
-
- static class PremergingStateAccessorImpl<K, W extends BoundedWindow>
- extends StateAccessorImpl<K, W> implements MergingStateAccessor<K, W> {
- public PremergingStateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> windowCoder,
- StateInternals<K> stateInternals, W window) {
- super(activeWindows, windowCoder, stateInternals,
- StateContexts.windowOnly(window), StateStyle.RENAMED);
- }
-
- Collection<W> mergingWindows() {
- return activeWindows.readStateAddresses(context.window());
- }
-
- @Override
- public <StateT extends State> Map<W, StateT> accessInEachMergingWindow(
- StateTag<? super K, StateT> address) {
- ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder();
- for (W stateAddressWindow : activeWindows.readStateAddresses(context.window())) {
- StateT stateForWindow =
- stateInternals.state(namespaceFor(stateAddressWindow), address, context);
- builder.put(stateAddressWindow, stateForWindow);
- }
- return builder.build();
- }
- }
-
- // ======================================================================
- // Contexts
- // ======================================================================
-
- private class ContextImpl extends ReduceFn<K, InputT, OutputT, W>.Context {
- private final StateAccessorImpl<K, W> state;
- private final TimersImpl timers;
-
- private ContextImpl(StateAccessorImpl<K, W> state) {
- reduceFn.super();
- this.state = state;
- this.timers = new TimersImpl(state.namespace());
- }
-
- @Override
- public K key() {
- return key;
- }
-
- @Override
- public W window() {
- return state.window();
- }
-
- @Override
- public WindowingStrategy<?, W> windowingStrategy() {
- return windowingStrategy;
- }
-
- @Override
- public StateAccessor<K> state() {
- return state;
- }
-
- @Override
- public Timers timers() {
- return timers;
- }
- }
-
- private class ProcessValueContextImpl
- extends ReduceFn<K, InputT, OutputT, W>.ProcessValueContext {
- private final InputT value;
- private final Instant timestamp;
- private final StateAccessorImpl<K, W> state;
- private final TimersImpl timers;
-
- private ProcessValueContextImpl(StateAccessorImpl<K, W> state,
- InputT value, Instant timestamp) {
- reduceFn.super();
- this.state = state;
- this.value = value;
- this.timestamp = timestamp;
- this.timers = new TimersImpl(state.namespace());
- }
-
- @Override
- public K key() {
- return key;
- }
-
- @Override
- public W window() {
- return state.window();
- }
-
- @Override
- public WindowingStrategy<?, W> windowingStrategy() {
- return windowingStrategy;
- }
-
- @Override
- public StateAccessor<K> state() {
- return state;
- }
-
- @Override
- public InputT value() {
- return value;
- }
-
- @Override
- public Instant timestamp() {
- return timestamp;
- }
-
- @Override
- public Timers timers() {
- return timers;
- }
- }
-
- private class OnTriggerContextImpl extends ReduceFn<K, InputT, OutputT, W>.OnTriggerContext {
- private final StateAccessorImpl<K, W> state;
- private final ReadableState<PaneInfo> pane;
- private final OnTriggerCallbacks<OutputT> callbacks;
- private final TimersImpl timers;
-
- private OnTriggerContextImpl(StateAccessorImpl<K, W> state, ReadableState<PaneInfo> pane,
- OnTriggerCallbacks<OutputT> callbacks) {
- reduceFn.super();
- this.state = state;
- this.pane = pane;
- this.callbacks = callbacks;
- this.timers = new TimersImpl(state.namespace());
- }
-
- @Override
- public K key() {
- return key;
- }
-
- @Override
- public W window() {
- return state.window();
- }
-
- @Override
- public WindowingStrategy<?, W> windowingStrategy() {
- return windowingStrategy;
- }
-
- @Override
- public StateAccessor<K> state() {
- return state;
- }
-
- @Override
- public PaneInfo paneInfo() {
- return pane.read();
- }
-
- @Override
- public void output(OutputT value) {
- callbacks.output(value);
- }
-
- @Override
- public Timers timers() {
- return timers;
- }
- }
-
- private class OnMergeContextImpl extends ReduceFn<K, InputT, OutputT, W>.OnMergeContext {
- private final MergingStateAccessorImpl<K, W> state;
- private final TimersImpl timers;
-
- private OnMergeContextImpl(MergingStateAccessorImpl<K, W> state) {
- reduceFn.super();
- this.state = state;
- this.timers = new TimersImpl(state.namespace());
- }
-
- @Override
- public K key() {
- return key;
- }
-
- @Override
- public WindowingStrategy<?, W> windowingStrategy() {
- return windowingStrategy;
- }
-
- @Override
- public MergingStateAccessor<K, W> state() {
- return state;
- }
-
- @Override
- public W window() {
- return state.window();
- }
-
- @Override
- public Timers timers() {
- return timers;
- }
- }
-
- private class OnPremergeContextImpl extends ReduceFn<K, InputT, OutputT, W>.OnMergeContext {
- private final PremergingStateAccessorImpl<K, W> state;
- private final TimersImpl timers;
-
- private OnPremergeContextImpl(PremergingStateAccessorImpl<K, W> state) {
- reduceFn.super();
- this.state = state;
- this.timers = new TimersImpl(state.namespace());
- }
-
- @Override
- public K key() {
- return key;
- }
-
- @Override
- public WindowingStrategy<?, W> windowingStrategy() {
- return windowingStrategy;
- }
-
- @Override
- public MergingStateAccessor<K, W> state() {
- return state;
- }
-
- @Override
- public W window() {
- return state.window();
- }
-
- @Override
- public Timers timers() {
- return timers;
- }
- }
-}