You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:32 UTC
[08/67] [partial] incubator-beam git commit: Directory reorganization
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/StringUtils.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/StringUtils.java
deleted file mode 100644
index 3a18336..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/StringUtils.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * Utilities for working with JSON and other human-readable string formats.
- */
-public class StringUtils {
- /**
- * Converts the given array of bytes into a legal JSON string.
- *
- * <p>Uses a simple strategy of converting each byte to a single char,
- * except for non-printable chars, non-ASCII chars, and '%', '\',
- * and '"', which are encoded as three chars in '%xx' format, where
- * 'xx' is the hexadecimal encoding of the byte.
- */
- public static String byteArrayToJsonString(byte[] bytes) {
- StringBuilder sb = new StringBuilder(bytes.length * 2);
- for (byte b : bytes) {
- if (b >= 32 && b < 127) {
- // A printable ascii character.
- char c = (char) b;
- if (c != '%' && c != '\\' && c != '\"') {
- // Not an escape prefix or special character, either.
- // Send through unchanged.
- sb.append(c);
- continue;
- }
- }
- // Send through escaped. Use '%xx' format.
- sb.append(String.format("%%%02x", b));
- }
- return sb.toString();
- }
-
- /**
- * Converts the given string, encoded using {@link #byteArrayToJsonString},
- * into a byte array.
- *
- * @throws IllegalArgumentException if the argument string is not legal
- */
- public static byte[] jsonStringToByteArray(String string) {
- List<Byte> bytes = new ArrayList<>();
- for (int i = 0; i < string.length(); ) {
- char c = string.charAt(i);
- Byte b;
- if (c == '%') {
- // Escaped. Expect '%xx' format.
- try {
- b = (byte) Integer.parseInt(string.substring(i + 1, i + 3), 16);
- } catch (IndexOutOfBoundsException | NumberFormatException exn) {
- throw new IllegalArgumentException(
- "not in legal encoded format; " +
- "substring [" + i + ".." + (i + 2) + "] not in format \"%xx\"",
- exn);
- }
- i += 3;
- } else {
- // Send through unchanged.
- b = (byte) c;
- i++;
- }
- bytes.add(b);
- }
- byte[] byteArray = new byte[bytes.size()];
- int i = 0;
- for (Byte b : bytes) {
- byteArray[i++] = b;
- }
- return byteArray;
- }
-
- private static final String[] STANDARD_NAME_SUFFIXES =
- new String[]{"DoFn", "Fn"};
-
- /**
- * Pattern to match a non-anonymous inner class.
- * Eg, matches "Foo$Bar", or even "Foo$1$Bar", but not "Foo$1" or "Foo$1$2".
- */
- private static final Pattern NAMED_INNER_CLASS =
- Pattern.compile(".+\\$(?<INNER>[^0-9].*)");
-
- private static final String ANONYMOUS_CLASS_REGEX = "\\$[0-9]+\\$";
-
- /**
- * Returns a simple name for a class.
- *
- * <p>Note: this is non-invertible - the name may be simplified to an
- * extent that it cannot be mapped back to the original class.
- *
- * <p>This can be used to generate human-readable names. It
- * removes the package and outer classes from the name,
- * and removes common suffixes.
- *
- * <p>Examples:
- * <ul>
- * <li>{@code some.package.Word.SummaryDoFn} -> "Summary"
- * <li>{@code another.package.PairingFn} -> "Pairing"
- * </ul>
- *
- * @throws IllegalArgumentException if the class is anonymous
- */
- public static String approximateSimpleName(Class<?> clazz) {
- return approximateSimpleName(clazz, /* dropOuterClassNames */ true);
- }
-
- /**
- * Returns a name for a PTransform class.
- *
- * <p>This can be used to generate human-readable transform names. It
- * removes the package from the name, and removes common suffixes.
- *
- * <p>It is different than approximateSimpleName:
- * <ul>
- * <li>1. It keeps the outer classes names.
- * <li>2. It removes the common transform inner class: "Bound".
- * </ul>
- *
- * <p>Examples:
- * <ul>
- * <li>{@code some.package.Word.Summary} -> "Word.Summary"
- * <li>{@code another.package.Pairing.Bound} -> "Pairing"
- * </ul>
- */
- public static String approximatePTransformName(Class<?> clazz) {
- Preconditions.checkArgument(PTransform.class.isAssignableFrom(clazz));
- return approximateSimpleName(clazz, /* dropOuterClassNames */ false)
- .replaceFirst("\\.Bound$", "");
- }
-
- /**
- * Calculate the Levenshtein distance between two strings.
- *
- * <p>The Levenshtein distance between two words is the minimum number of single-character edits
- * (i.e. insertions, deletions or substitutions) required to change one string into the other.
- */
- public static int getLevenshteinDistance(final String s, final String t) {
- Preconditions.checkNotNull(s);
- Preconditions.checkNotNull(t);
-
- // base cases
- if (s.equals(t)) {
- return 0;
- }
- if (s.length() == 0) {
- return t.length();
- }
- if (t.length() == 0) {
- return s.length();
- }
-
- // create two work arrays to store integer distances
- final int[] v0 = new int[t.length() + 1];
- final int[] v1 = new int[t.length() + 1];
-
- // initialize v0 (the previous row of distances)
- // this row is A[0][i]: edit distance for an empty s
- // the distance is just the number of characters to delete from t
- for (int i = 0; i < v0.length; i++) {
- v0[i] = i;
- }
-
- for (int i = 0; i < s.length(); i++) {
- // calculate v1 (current row distances) from the previous row v0
-
- // first element of v1 is A[i+1][0]
- // edit distance is delete (i+1) chars from s to match empty t
- v1[0] = i + 1;
-
- // use formula to fill in the rest of the row
- for (int j = 0; j < t.length(); j++) {
- int cost = (s.charAt(i) == t.charAt(j)) ? 0 : 1;
- v1[j + 1] = Math.min(Math.min(v1[j] + 1, v0[j + 1] + 1), v0[j] + cost);
- }
-
- // copy v1 (current row) to v0 (previous row) for next iteration
- System.arraycopy(v1, 0, v0, 0, v0.length);
- }
-
- return v1[t.length()];
- }
-
- private static String approximateSimpleName(Class<?> clazz, boolean dropOuterClassNames) {
- Preconditions.checkArgument(!clazz.isAnonymousClass(),
- "Attempted to get simple name of anonymous class");
-
- String fullName = clazz.getName();
- String shortName = fullName.substring(fullName.lastIndexOf('.') + 1);
-
- // Drop common suffixes for each named component.
- String[] names = shortName.split("\\$");
- for (int i = 0; i < names.length; i++) {
- names[i] = simplifyNameComponent(names[i]);
- }
- shortName = Joiner.on('$').join(names);
-
- if (dropOuterClassNames) {
- // Simplify inner class name by dropping outer class prefixes.
- Matcher m = NAMED_INNER_CLASS.matcher(shortName);
- if (m.matches()) {
- shortName = m.group("INNER");
- }
- } else {
- // Dropping anonymous outer classes
- shortName = shortName.replaceAll(ANONYMOUS_CLASS_REGEX, ".");
- shortName = shortName.replaceAll("\\$", ".");
- }
- return shortName;
- }
-
- private static String simplifyNameComponent(String name) {
- for (String suffix : STANDARD_NAME_SUFFIXES) {
- if (name.endsWith(suffix) && name.length() > suffix.length()) {
- return name.substring(0, name.length() - suffix.length());
- }
- }
- return name;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Structs.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Structs.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Structs.java
deleted file mode 100644
index c621c55..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Structs.java
+++ /dev/null
@@ -1,384 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.api.client.util.Data;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-/**
- * A collection of static methods for manipulating datastructure representations
- * transferred via the Dataflow API.
- */
-public final class Structs {
- private Structs() {} // Non-instantiable
-
- public static String getString(Map<String, Object> map, String name) throws Exception {
- return getValue(map, name, String.class, "a string");
- }
-
- public static String getString(
- Map<String, Object> map, String name, @Nullable String defaultValue)
- throws Exception {
- return getValue(map, name, String.class, "a string", defaultValue);
- }
-
- public static byte[] getBytes(Map<String, Object> map, String name) throws Exception {
- @Nullable byte[] result = getBytes(map, name, null);
- if (result == null) {
- throw new ParameterNotFoundException(name, map);
- }
- return result;
- }
-
- @Nullable
- public static byte[] getBytes(Map<String, Object> map, String name, @Nullable byte[] defaultValue)
- throws Exception {
- @Nullable String jsonString = getString(map, name, null);
- if (jsonString == null) {
- return defaultValue;
- }
- // TODO: Need to agree on a format for encoding bytes in
- // a string that can be sent to the backend, over the cloud
- // map task work API. base64 encoding seems pretty common. Switch to it?
- return StringUtils.jsonStringToByteArray(jsonString);
- }
-
- public static Boolean getBoolean(Map<String, Object> map, String name) throws Exception {
- return getValue(map, name, Boolean.class, "a boolean");
- }
-
- @Nullable
- public static Boolean getBoolean(
- Map<String, Object> map, String name, @Nullable Boolean defaultValue)
- throws Exception {
- return getValue(map, name, Boolean.class, "a boolean", defaultValue);
- }
-
- public static Long getLong(Map<String, Object> map, String name) throws Exception {
- return getValue(map, name, Long.class, "a long");
- }
-
- @Nullable
- public static Long getLong(Map<String, Object> map, String name, @Nullable Long defaultValue)
- throws Exception {
- return getValue(map, name, Long.class, "a long", defaultValue);
- }
-
- public static Integer getInt(Map<String, Object> map, String name) throws Exception {
- return getValue(map, name, Integer.class, "an int");
- }
-
- @Nullable
- public static Integer getInt(Map<String, Object> map, String name, @Nullable Integer defaultValue)
- throws Exception {
- return getValue(map, name, Integer.class, "an int", defaultValue);
- }
-
- @Nullable
- public static List<String> getStrings(
- Map<String, Object> map, String name, @Nullable List<String> defaultValue)
- throws Exception {
- @Nullable Object value = map.get(name);
- if (value == null) {
- if (map.containsKey(name)) {
- throw new IncorrectTypeException(name, map, "a string or a list");
- }
- return defaultValue;
- }
- if (Data.isNull(value)) {
- // This is a JSON literal null. When represented as a list of strings,
- // this is an empty list.
- return Collections.<String>emptyList();
- }
- @Nullable String singletonString = decodeValue(value, String.class);
- if (singletonString != null) {
- return Collections.singletonList(singletonString);
- }
- if (!(value instanceof List)) {
- throw new IncorrectTypeException(name, map, "a string or a list");
- }
- @SuppressWarnings("unchecked")
- List<Object> elements = (List<Object>) value;
- List<String> result = new ArrayList<>(elements.size());
- for (Object o : elements) {
- @Nullable String s = decodeValue(o, String.class);
- if (s == null) {
- throw new IncorrectTypeException(name, map, "a list of strings");
- }
- result.add(s);
- }
- return result;
- }
-
- public static Map<String, Object> getObject(Map<String, Object> map, String name)
- throws Exception {
- @Nullable Map<String, Object> result = getObject(map, name, null);
- if (result == null) {
- throw new ParameterNotFoundException(name, map);
- }
- return result;
- }
-
- @Nullable
- public static Map<String, Object> getObject(
- Map<String, Object> map, String name, @Nullable Map<String, Object> defaultValue)
- throws Exception {
- @Nullable Object value = map.get(name);
- if (value == null) {
- if (map.containsKey(name)) {
- throw new IncorrectTypeException(name, map, "an object");
- }
- return defaultValue;
- }
- return checkObject(value, map, name);
- }
-
- private static Map<String, Object> checkObject(
- Object value, Map<String, Object> map, String name) throws Exception {
- if (Data.isNull(value)) {
- // This is a JSON literal null. When represented as an object, this is an
- // empty map.
- return Collections.<String, Object>emptyMap();
- }
- if (!(value instanceof Map)) {
- throw new IncorrectTypeException(name, map, "an object (not a map)");
- }
- @SuppressWarnings("unchecked")
- Map<String, Object> mapValue = (Map<String, Object>) value;
- if (!mapValue.containsKey(PropertyNames.OBJECT_TYPE_NAME)) {
- throw new IncorrectTypeException(name, map,
- "an object (no \"" + PropertyNames.OBJECT_TYPE_NAME + "\" field)");
- }
- return mapValue;
- }
-
- @Nullable
- public static List<Map<String, Object>> getListOfMaps(Map<String, Object> map, String name,
- @Nullable List<Map<String, Object>> defaultValue) throws Exception {
- @Nullable
- Object value = map.get(name);
- if (value == null) {
- if (map.containsKey(name)) {
- throw new IncorrectTypeException(name, map, "a list");
- }
- return defaultValue;
- }
- if (Data.isNull(value)) {
- // This is a JSON literal null. When represented as a list,
- // this is an empty list.
- return Collections.<Map<String, Object>>emptyList();
- }
-
- if (!(value instanceof List)) {
- throw new IncorrectTypeException(name, map, "a list");
- }
-
- List<?> elements = (List<?>) value;
- for (Object elem : elements) {
- if (!(elem instanceof Map)) {
- throw new IncorrectTypeException(name, map, "a list of Map objects");
- }
- }
-
- @SuppressWarnings("unchecked")
- List<Map<String, Object>> result = (List<Map<String, Object>>) elements;
- return result;
- }
-
- public static Map<String, Object> getDictionary(
- Map<String, Object> map, String name) throws Exception {
- @Nullable Object value = map.get(name);
- if (value == null) {
- throw new ParameterNotFoundException(name, map);
- }
- if (Data.isNull(value)) {
- // This is a JSON literal null. When represented as a dictionary, this is
- // an empty map.
- return Collections.<String, Object>emptyMap();
- }
- if (!(value instanceof Map)) {
- throw new IncorrectTypeException(name, map, "a dictionary");
- }
- @SuppressWarnings("unchecked")
- Map<String, Object> result = (Map<String, Object>) value;
- return result;
- }
-
- @Nullable
- public static Map<String, Object> getDictionary(
- Map<String, Object> map, String name, @Nullable Map<String, Object> defaultValue)
- throws Exception {
- @Nullable Object value = map.get(name);
- if (value == null) {
- if (map.containsKey(name)) {
- throw new IncorrectTypeException(name, map, "a dictionary");
- }
- return defaultValue;
- }
- if (Data.isNull(value)) {
- // This is a JSON literal null. When represented as a dictionary, this is
- // an empty map.
- return Collections.<String, Object>emptyMap();
- }
- if (!(value instanceof Map)) {
- throw new IncorrectTypeException(name, map, "a dictionary");
- }
- @SuppressWarnings("unchecked")
- Map<String, Object> result = (Map<String, Object>) value;
- return result;
- }
-
- // Builder operations.
-
- public static void addString(Map<String, Object> map, String name, String value) {
- addObject(map, name, CloudObject.forString(value));
- }
-
- public static void addBoolean(Map<String, Object> map, String name, boolean value) {
- addObject(map, name, CloudObject.forBoolean(value));
- }
-
- public static void addLong(Map<String, Object> map, String name, long value) {
- addObject(map, name, CloudObject.forInteger(value));
- }
-
- public static void addObject(
- Map<String, Object> map, String name, Map<String, Object> value) {
- map.put(name, value);
- }
-
- public static void addNull(Map<String, Object> map, String name) {
- map.put(name, Data.nullOf(Object.class));
- }
-
- public static void addLongs(Map<String, Object> map, String name, long... longs) {
- List<Map<String, Object>> elements = new ArrayList<>(longs.length);
- for (Long value : longs) {
- elements.add(CloudObject.forInteger(value));
- }
- map.put(name, elements);
- }
-
- public static void addList(
- Map<String, Object> map, String name, List<? extends Map<String, Object>> elements) {
- map.put(name, elements);
- }
-
- public static void addStringList(Map<String, Object> map, String name, List<String> elements) {
- ArrayList<CloudObject> objects = new ArrayList<>(elements.size());
- for (String element : elements) {
- objects.add(CloudObject.forString(element));
- }
- addList(map, name, objects);
- }
-
- public static <T extends Map<String, Object>> void addList(
- Map<String, Object> map, String name, T[] elements) {
- map.put(name, Arrays.asList(elements));
- }
-
- public static void addDictionary(
- Map<String, Object> map, String name, Map<String, Object> value) {
- map.put(name, value);
- }
-
- public static void addDouble(Map<String, Object> map, String name, Double value) {
- addObject(map, name, CloudObject.forFloat(value));
- }
-
- // Helper methods for a few of the accessor methods.
-
- private static <T> T getValue(Map<String, Object> map, String name, Class<T> clazz, String type)
- throws Exception {
- @Nullable T result = getValue(map, name, clazz, type, null);
- if (result == null) {
- throw new ParameterNotFoundException(name, map);
- }
- return result;
- }
-
- @Nullable
- private static <T> T getValue(
- Map<String, Object> map, String name, Class<T> clazz, String type, @Nullable T defaultValue)
- throws Exception {
- @Nullable Object value = map.get(name);
- if (value == null) {
- if (map.containsKey(name)) {
- throw new IncorrectTypeException(name, map, type);
- }
- return defaultValue;
- }
- T result = decodeValue(value, clazz);
- if (result == null) {
- // The value exists, but can't be decoded.
- throw new IncorrectTypeException(name, map, type);
- }
- return result;
- }
-
- @Nullable
- private static <T> T decodeValue(Object value, Class<T> clazz) {
- try {
- if (value.getClass() == clazz) {
- // decodeValue() is only called for final classes; if the class matches,
- // it's safe to just return the value, and if it doesn't match, decoding
- // is needed.
- return clazz.cast(value);
- }
- if (!(value instanceof Map)) {
- return null;
- }
- @SuppressWarnings("unchecked")
- Map<String, Object> map = (Map<String, Object>) value;
- @Nullable String typeName = (String) map.get(PropertyNames.OBJECT_TYPE_NAME);
- if (typeName == null) {
- return null;
- }
- @Nullable CloudKnownType knownType = CloudKnownType.forUri(typeName);
- if (knownType == null) {
- return null;
- }
- @Nullable Object scalar = map.get(PropertyNames.SCALAR_FIELD_NAME);
- if (scalar == null) {
- return null;
- }
- return knownType.parse(scalar, clazz);
- } catch (ClassCastException e) {
- // If any class cast fails during decoding, the value's not decodable.
- return null;
- }
- }
-
- private static final class ParameterNotFoundException extends Exception {
- public ParameterNotFoundException(String name, Map<String, Object> map) {
- super("didn't find required parameter " + name + " in " + map);
- }
- }
-
- private static final class IncorrectTypeException extends Exception {
- public IncorrectTypeException(String name, Map<String, Object> map, String type) {
- super("required parameter " + name + " in " + map + " not " + type);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/SystemDoFnInternal.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/SystemDoFnInternal.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/SystemDoFnInternal.java
deleted file mode 100644
index 3255ede..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/SystemDoFnInternal.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-
-import java.lang.annotation.Documented;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/**
- * Annotation to mark {@link DoFn DoFns} as an internal component of the Dataflow SDK.
- *
- * <p>Currently, the only effect of this is to mark any aggregators reported by an annotated
- * {@code DoFn} as a system counter (as opposed to a user counter).
- *
- * <p>This is internal to the Dataflow SDK.
- */
-@Documented
-@Retention(RetentionPolicy.RUNTIME)
-@Target(ElementType.TYPE)
-public @interface SystemDoFnInternal {}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/SystemReduceFn.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/SystemReduceFn.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/SystemReduceFn.java
deleted file mode 100644
index 1665792..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/SystemReduceFn.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-import com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.util.state.AccumulatorCombiningState;
-import com.google.cloud.dataflow.sdk.util.state.BagState;
-import com.google.cloud.dataflow.sdk.util.state.CombiningState;
-import com.google.cloud.dataflow.sdk.util.state.MergingStateAccessor;
-import com.google.cloud.dataflow.sdk.util.state.ReadableState;
-import com.google.cloud.dataflow.sdk.util.state.StateAccessor;
-import com.google.cloud.dataflow.sdk.util.state.StateMerging;
-import com.google.cloud.dataflow.sdk.util.state.StateTag;
-import com.google.cloud.dataflow.sdk.util.state.StateTags;
-
-/**
- * {@link ReduceFn} implementing the default reduction behaviors of {@link GroupByKey}.
- *
- * @param <K> The type of key being processed.
- * @param <InputT> The type of values associated with the key.
- * @param <OutputT> The output type that will be produced for each key.
- * @param <W> The type of windows this operates on.
- */
-public abstract class SystemReduceFn<K, InputT, AccumT, OutputT, W extends BoundedWindow>
- extends ReduceFn<K, InputT, OutputT, W> {
- private static final String BUFFER_NAME = "buf";
-
- /**
- * Create a factory that produces {@link SystemReduceFn} instances that that buffer all of the
- * input values in persistent state and produces an {@code Iterable<T>}.
- */
- public static <K, T, W extends BoundedWindow> SystemReduceFn<K, T, Iterable<T>, Iterable<T>, W>
- buffering(final Coder<T> inputCoder) {
- final StateTag<Object, BagState<T>> bufferTag =
- StateTags.makeSystemTagInternal(StateTags.bag(BUFFER_NAME, inputCoder));
- return new SystemReduceFn<K, T, Iterable<T>, Iterable<T>, W>(bufferTag) {
- @Override
- public void prefetchOnMerge(MergingStateAccessor<K, W> state) throws Exception {
- StateMerging.prefetchBags(state, bufferTag);
- }
-
- @Override
- public void onMerge(OnMergeContext c) throws Exception {
- StateMerging.mergeBags(c.state(), bufferTag);
- }
- };
- }
-
- /**
- * Create a factory that produces {@link SystemReduceFn} instances that combine all of the input
- * values using a {@link CombineFn}.
- */
- public static <K, InputT, AccumT, OutputT, W extends BoundedWindow> SystemReduceFn<K, InputT,
- AccumT, OutputT, W>
- combining(
- final Coder<K> keyCoder, final AppliedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
- final StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> bufferTag;
- if (combineFn.getFn() instanceof KeyedCombineFnWithContext) {
- bufferTag = StateTags.makeSystemTagInternal(
- StateTags.<K, InputT, AccumT, OutputT>keyedCombiningValueWithContext(
- BUFFER_NAME, combineFn.getAccumulatorCoder(),
- (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) combineFn.getFn()));
-
- } else {
- bufferTag = StateTags.makeSystemTagInternal(
- StateTags.<K, InputT, AccumT, OutputT>keyedCombiningValue(
- BUFFER_NAME, combineFn.getAccumulatorCoder(),
- (KeyedCombineFn<K, InputT, AccumT, OutputT>) combineFn.getFn()));
- }
- return new SystemReduceFn<K, InputT, AccumT, OutputT, W>(bufferTag) {
- @Override
- public void prefetchOnMerge(MergingStateAccessor<K, W> state) throws Exception {
- StateMerging.prefetchCombiningValues(state, bufferTag);
- }
-
- @Override
- public void onMerge(OnMergeContext c) throws Exception {
- StateMerging.mergeCombiningValues(c.state(), bufferTag);
- }
- };
- }
-
- private StateTag<? super K, ? extends CombiningState<InputT, OutputT>> bufferTag;
-
- public SystemReduceFn(
- StateTag<? super K, ? extends CombiningState<InputT, OutputT>> bufferTag) {
- this.bufferTag = bufferTag;
- }
-
- @Override
- public void processValue(ProcessValueContext c) throws Exception {
- c.state().access(bufferTag).add(c.value());
- }
-
- @Override
- public void prefetchOnTrigger(StateAccessor<K> state) {
- state.access(bufferTag).readLater();
- }
-
- @Override
- public void onTrigger(OnTriggerContext c) throws Exception {
- c.output(c.state().access(bufferTag).read());
- }
-
- @Override
- public void clearState(Context c) throws Exception {
- c.state().access(bufferTag).clear();
- }
-
- @Override
- public ReadableState<Boolean> isEmpty(StateAccessor<K> state) {
- return state.access(bufferTag).isEmpty();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TestCredential.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TestCredential.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TestCredential.java
deleted file mode 100644
index 359e157..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TestCredential.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.api.client.auth.oauth2.BearerToken;
-import com.google.api.client.auth.oauth2.Credential;
-import com.google.api.client.auth.oauth2.TokenResponse;
-import com.google.api.client.testing.http.MockHttpTransport;
-
-import java.io.IOException;
-
-/**
- * Fake credential, for use in testing.
- */
-public class TestCredential extends Credential {
-
- private final String token;
-
- public TestCredential() {
- this("NULL");
- }
-
- public TestCredential(String token) {
- super(new Builder(
- BearerToken.authorizationHeaderAccessMethod())
- .setTransport(new MockHttpTransport()));
- this.token = token;
- }
-
- @Override
- protected TokenResponse executeRefreshToken() throws IOException {
- TokenResponse response = new TokenResponse();
- response.setExpiresInSeconds(5L * 60);
- response.setAccessToken(token);
- return response;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TimeDomain.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TimeDomain.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TimeDomain.java
deleted file mode 100644
index 4ff36f7..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TimeDomain.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-/**
- * {@code TimeDomain} specifies whether an operation is based on
- * timestamps of elements or current "real-world" time as reported while processing.
- */
-public enum TimeDomain {
- /**
- * The {@code EVENT_TIME} domain corresponds to the timestamps on the elements. Time advances
- * on the system watermark advances.
- */
- EVENT_TIME,
-
- /**
- * The {@code PROCESSING_TIME} domain corresponds to the current to the current (system) time.
- * This is advanced during execution of the Dataflow pipeline.
- */
- PROCESSING_TIME,
-
- /**
- * Same as the {@code PROCESSING_TIME} domain, except it won't fire a timer set for time
- * {@code T} until all timers from earlier stages set for a time earlier than {@code T} have
- * fired.
- */
- SYNCHRONIZED_PROCESSING_TIME;
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TimeUtil.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TimeUtil.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TimeUtil.java
deleted file mode 100644
index 93195a7..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TimeUtil.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import org.joda.time.DateTime;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.joda.time.ReadableDuration;
-import org.joda.time.ReadableInstant;
-import org.joda.time.chrono.ISOChronology;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import javax.annotation.Nullable;
-
-/**
- * A helper class for converting between Dataflow API and SDK time
- * representations.
- *
- * <p>Dataflow API times are strings of the form
- * {@code YYYY-MM-dd'T'HH:mm:ss[.nnnn]'Z'}: that is, RFC 3339
- * strings with optional fractional seconds and a 'Z' offset.
- *
- * <p>Dataflow API durations are strings of the form {@code ['-']sssss[.nnnn]'s'}:
- * that is, seconds with optional fractional seconds and a literal 's' at the end.
- *
- * <p>In both formats, fractional seconds are either three digits (millisecond
- * resolution), six digits (microsecond resolution), or nine digits (nanosecond
- * resolution).
- */
-public final class TimeUtil {
- private TimeUtil() {} // Non-instantiable.
-
- private static final Pattern DURATION_PATTERN = Pattern.compile("(\\d+)(?:\\.(\\d+))?s");
- private static final Pattern TIME_PATTERN =
- Pattern.compile("(\\d{4})-(\\d{2})-(\\d{2})T(\\d{2}):(\\d{2}):(\\d{2})(?:\\.(\\d+))?Z");
-
- /**
- * Converts a {@link ReadableInstant} into a Dateflow API time value.
- */
- public static String toCloudTime(ReadableInstant instant) {
- // Note that since Joda objects use millisecond resolution, we always
- // produce either no fractional seconds or fractional seconds with
- // millisecond resolution.
-
- // Translate the ReadableInstant to a DateTime with ISOChronology.
- DateTime time = new DateTime(instant);
-
- int millis = time.getMillisOfSecond();
- if (millis == 0) {
- return String.format("%04d-%02d-%02dT%02d:%02d:%02dZ",
- time.getYear(),
- time.getMonthOfYear(),
- time.getDayOfMonth(),
- time.getHourOfDay(),
- time.getMinuteOfHour(),
- time.getSecondOfMinute());
- } else {
- return String.format("%04d-%02d-%02dT%02d:%02d:%02d.%03dZ",
- time.getYear(),
- time.getMonthOfYear(),
- time.getDayOfMonth(),
- time.getHourOfDay(),
- time.getMinuteOfHour(),
- time.getSecondOfMinute(),
- millis);
- }
- }
-
- /**
- * Converts a time value received via the Dataflow API into the corresponding
- * {@link Instant}.
- * @return the parsed time, or null if a parse error occurs
- */
- @Nullable
- public static Instant fromCloudTime(String time) {
- Matcher matcher = TIME_PATTERN.matcher(time);
- if (!matcher.matches()) {
- return null;
- }
- int year = Integer.valueOf(matcher.group(1));
- int month = Integer.valueOf(matcher.group(2));
- int day = Integer.valueOf(matcher.group(3));
- int hour = Integer.valueOf(matcher.group(4));
- int minute = Integer.valueOf(matcher.group(5));
- int second = Integer.valueOf(matcher.group(6));
- int millis = 0;
-
- String frac = matcher.group(7);
- if (frac != null) {
- int fracs = Integer.valueOf(frac);
- if (frac.length() == 3) { // millisecond resolution
- millis = fracs;
- } else if (frac.length() == 6) { // microsecond resolution
- millis = fracs / 1000;
- } else if (frac.length() == 9) { // nanosecond resolution
- millis = fracs / 1000000;
- } else {
- return null;
- }
- }
-
- return new DateTime(year, month, day, hour, minute, second, millis,
- ISOChronology.getInstanceUTC()).toInstant();
- }
-
- /**
- * Converts a {@link ReadableDuration} into a Dataflow API duration string.
- */
- public static String toCloudDuration(ReadableDuration duration) {
- // Note that since Joda objects use millisecond resolution, we always
- // produce either no fractional seconds or fractional seconds with
- // millisecond resolution.
- long millis = duration.getMillis();
- long seconds = millis / 1000;
- millis = millis % 1000;
- if (millis == 0) {
- return String.format("%ds", seconds);
- } else {
- return String.format("%d.%03ds", seconds, millis);
- }
- }
-
- /**
- * Converts a Dataflow API duration string into a {@link Duration}.
- * @return the parsed duration, or null if a parse error occurs
- */
- @Nullable
- public static Duration fromCloudDuration(String duration) {
- Matcher matcher = DURATION_PATTERN.matcher(duration);
- if (!matcher.matches()) {
- return null;
- }
- long millis = Long.valueOf(matcher.group(1)) * 1000;
- String frac = matcher.group(2);
- if (frac != null) {
- long fracs = Long.valueOf(frac);
- if (frac.length() == 3) { // millisecond resolution
- millis += fracs;
- } else if (frac.length() == 6) { // microsecond resolution
- millis += fracs / 1000;
- } else if (frac.length() == 9) { // nanosecond resolution
- millis += fracs / 1000000;
- } else {
- return null;
- }
- }
- return Duration.millis(millis);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java
deleted file mode 100644
index c823ed3..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TimerInternals.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.coders.InstantCoder;
-import com.google.cloud.dataflow.sdk.coders.StandardCoder;
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.util.state.StateNamespace;
-import com.google.cloud.dataflow.sdk.util.state.StateNamespaces;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-
-import javax.annotation.Nullable;
-
-/**
- * Encapsulate interaction with time within the execution environment.
- *
- * <p>This class allows setting and deleting timers, and also retrieving an
- * estimate of the current time.
- */
-public interface TimerInternals {
-
- /**
- * Writes out a timer to be fired when the watermark reaches the given
- * timestamp.
- *
- * <p>The combination of {@code namespace}, {@code timestamp} and {@code domain} uniquely
- * identify a timer. Multiple timers set for the same parameters can be safely deduplicated.
- */
- void setTimer(TimerData timerKey);
-
- /**
- * Deletes the given timer.
- */
- void deleteTimer(TimerData timerKey);
-
- /**
- * Returns the current timestamp in the {@link TimeDomain#PROCESSING_TIME} time domain.
- */
- Instant currentProcessingTime();
-
- /**
- * Returns the current timestamp in the {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} time
- * domain or {@code null} if unknown.
- */
- @Nullable
- Instant currentSynchronizedProcessingTime();
-
- /**
- * Return the current, local input watermark timestamp for this computation
- * in the {@link TimeDomain#EVENT_TIME} time domain. Return {@code null} if unknown.
- *
- * <p>This value:
- * <ol>
- * <li>Is monotonically increasing.
- * <li>May differ between workers due to network and other delays.
- * <li>Will never be ahead of the global input watermark for this computation. But it
- * may be arbitrarily behind the global input watermark.
- * <li>Any element with a timestamp before the local input watermark can be considered
- * 'locally late' and be subject to special processing or be dropped entirely.
- * </ol>
- *
- * <p>Note that because the local input watermark can be behind the global input watermark,
- * it is possible for an element to be considered locally on-time even though it is
- * globally late.
- */
- @Nullable
- Instant currentInputWatermarkTime();
-
- /**
- * Return the current, local output watermark timestamp for this computation
- * in the {@link TimeDomain#EVENT_TIME} time domain. Return {@code null} if unknown.
- *
- * <p>This value:
- * <ol>
- * <li>Is monotonically increasing.
- * <li>Will never be ahead of {@link #currentInputWatermarkTime} as returned above.
- * <li>May differ between workers due to network and other delays.
- * <li>However will never be behind the global input watermark for any following computation.
- * </ol>
- *
- * <p> In pictures:
- * <pre>
- * | | | | |
- * | | D | C | B | A
- * | | | | |
- * GIWM <= GOWM <= LOWM <= LIWM <= GIWM
- * (next stage)
- * -------------------------------------------------> event time
- * </pre>
- * where
- * <ul>
- * <li> LOWM = local output water mark.
- * <li> GOWM = global output water mark.
- * <li> GIWM = global input water mark.
- * <li> LIWM = local input water mark.
- * <li> A = A globally on-time element.
- * <li> B = A globally late, but locally on-time element.
- * <li> C = A locally late element which may still contribute to the timestamp of a pane.
- * <li> D = A locally late element which cannot contribute to the timestamp of a pane.
- * </ul>
- *
- * <p>Note that if a computation emits an element which is not before the current output watermark
- * then that element will always appear locally on-time in all following computations. However,
- * it is possible for an element emitted before the current output watermark to appear locally
- * on-time in a following computation. Thus we must be careful to never assume locally late data
- * viewed on the output of a computation remains locally late on the input of a following
- * computation.
- */
- @Nullable
- Instant currentOutputWatermarkTime();
-
- /**
- * Data about a timer as represented within {@link TimerInternals}.
- */
- public static class TimerData implements Comparable<TimerData> {
- private final StateNamespace namespace;
- private final Instant timestamp;
- private final TimeDomain domain;
-
- private TimerData(StateNamespace namespace, Instant timestamp, TimeDomain domain) {
- this.namespace = checkNotNull(namespace);
- this.timestamp = checkNotNull(timestamp);
- this.domain = checkNotNull(domain);
- }
-
- public StateNamespace getNamespace() {
- return namespace;
- }
-
- public Instant getTimestamp() {
- return timestamp;
- }
-
- public TimeDomain getDomain() {
- return domain;
- }
-
- /**
- * Construct the {@code TimerKey} for the given parameters.
- */
- public static TimerData of(StateNamespace namespace, Instant timestamp, TimeDomain domain) {
- return new TimerData(namespace, timestamp, domain);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
-
- if (!(obj instanceof TimerData)) {
- return false;
- }
-
- TimerData that = (TimerData) obj;
- return Objects.equals(this.domain, that.domain)
- && this.timestamp.isEqual(that.timestamp)
- && Objects.equals(this.namespace, that.namespace);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(domain, timestamp, namespace);
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("namespace", namespace)
- .add("timestamp", timestamp)
- .add("domain", domain)
- .toString();
- }
-
- @Override
- public int compareTo(TimerData o) {
- return Long.compare(timestamp.getMillis(), o.getTimestamp().getMillis());
- }
- }
-
- /**
- * A {@link Coder} for {@link TimerData}.
- */
- public class TimerDataCoder extends StandardCoder<TimerData> {
- private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
- private static final InstantCoder INSTANT_CODER = InstantCoder.of();
- private final Coder<? extends BoundedWindow> windowCoder;
-
- public static TimerDataCoder of(Coder<? extends BoundedWindow> windowCoder) {
- return new TimerDataCoder(windowCoder);
- }
-
- @SuppressWarnings("unchecked")
- @JsonCreator
- public static TimerDataCoder of(
- @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
- List<Coder<?>> components) {
- Preconditions.checkArgument(components.size() == 1,
- "Expecting 1 components, got " + components.size());
- return of((Coder<? extends BoundedWindow>) components.get(0));
- }
-
- private TimerDataCoder(Coder<? extends BoundedWindow> windowCoder) {
- this.windowCoder = windowCoder;
- }
-
- @Override
- public void encode(TimerData timer, OutputStream outStream, Context context)
- throws CoderException, IOException {
- Context nestedContext = context.nested();
- STRING_CODER.encode(timer.namespace.stringKey(), outStream, nestedContext);
- INSTANT_CODER.encode(timer.timestamp, outStream, nestedContext);
- STRING_CODER.encode(timer.domain.name(), outStream, nestedContext);
- }
-
- @Override
- public TimerData decode(InputStream inStream, Context context)
- throws CoderException, IOException {
- Context nestedContext = context.nested();
- StateNamespace namespace =
- StateNamespaces.fromString(STRING_CODER.decode(inStream, nestedContext), windowCoder);
- Instant timestamp = INSTANT_CODER.decode(inStream, nestedContext);
- TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream, nestedContext));
- return TimerData.of(namespace, timestamp, domain);
- }
-
- @Override
- public List<? extends Coder<?>> getCoderArguments() {
- return Arrays.asList(windowCoder);
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- verifyDeterministic("window coder must be deterministic", windowCoder);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java
deleted file mode 100644
index 7d4b4f2..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Timers.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-
-import org.joda.time.Instant;
-
-import javax.annotation.Nullable;
-
-/**
- * Interface for interacting with time.
- */
-@Experimental(Experimental.Kind.TIMERS)
-public interface Timers {
- /**
- * Sets a timer to fire when the event time watermark, the current processing time, or
- * the synchronized processing time watermark surpasses a given timestamp.
- *
- * <p>See {@link TimeDomain} for details on the time domains available.
- *
- * <p>Timers are not guaranteed to fire immediately, but will be delivered at some time
- * afterwards.
- *
- * <p>An implementation of {@link Timers} implicitly scopes timers that are set - they may
- * be scoped to a key and window, or a key, window, and trigger, etc.
- *
- * @param timestamp the time at which the timer should be delivered
- * @param timeDomain the domain that the {@code timestamp} applies to
- */
- public abstract void setTimer(Instant timestamp, TimeDomain timeDomain);
-
- /** Removes the timer set in this context for the {@code timestmap} and {@code timeDomain}. */
- public abstract void deleteTimer(Instant timestamp, TimeDomain timeDomain);
-
- /** Returns the current processing time. */
- public abstract Instant currentProcessingTime();
-
- /** Returns the current synchronized processing time or {@code null} if unknown. */
- @Nullable
- public abstract Instant currentSynchronizedProcessingTime();
-
- /** Returns the current event time or {@code null} if unknown. */
- @Nullable
- public abstract Instant currentEventTime();
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java
deleted file mode 100644
index 15fe286..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*******************************************************************************
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- ******************************************************************************/
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.api.client.auth.oauth2.Credential;
-import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
-import com.google.api.client.http.HttpRequestInitializer;
-import com.google.api.client.http.HttpTransport;
-import com.google.api.client.json.JsonFactory;
-import com.google.api.client.json.jackson2.JacksonFactory;
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.clouddebugger.v2.Clouddebugger;
-import com.google.api.services.dataflow.Dataflow;
-import com.google.api.services.pubsub.Pubsub;
-import com.google.api.services.storage.Storage;
-import com.google.cloud.dataflow.sdk.options.BigQueryOptions;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineDebugOptions;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.GcsOptions;
-import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
-import com.google.common.collect.ImmutableList;
-
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.security.GeneralSecurityException;
-
-/**
- * Helpers for cloud communication.
- */
-public class Transport {
-
- private static class SingletonHelper {
- /** Global instance of the JSON factory. */
- private static final JsonFactory JSON_FACTORY;
-
- /** Global instance of the HTTP transport. */
- private static final HttpTransport HTTP_TRANSPORT;
-
- static {
- try {
- JSON_FACTORY = JacksonFactory.getDefaultInstance();
- HTTP_TRANSPORT = GoogleNetHttpTransport.newTrustedTransport();
- } catch (GeneralSecurityException | IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- public static HttpTransport getTransport() {
- return SingletonHelper.HTTP_TRANSPORT;
- }
-
- public static JsonFactory getJsonFactory() {
- return SingletonHelper.JSON_FACTORY;
- }
-
- private static class ApiComponents {
- public String rootUrl;
- public String servicePath;
-
- public ApiComponents(String root, String path) {
- this.rootUrl = root;
- this.servicePath = path;
- }
- }
-
- private static ApiComponents apiComponentsFromUrl(String urlString) {
- try {
- URL url = new URL(urlString);
- String rootUrl = url.getProtocol() + "://" + url.getHost() +
- (url.getPort() > 0 ? ":" + url.getPort() : "");
- return new ApiComponents(rootUrl, url.getPath());
- } catch (MalformedURLException e) {
- throw new RuntimeException("Invalid URL: " + urlString);
- }
- }
-
- /**
- * Returns a BigQuery client builder.
- *
- * <p>Note: this client's endpoint is <b>not</b> modified by the
- * {@link DataflowPipelineDebugOptions#getApiRootUrl()} option.
- */
- public static Bigquery.Builder
- newBigQueryClient(BigQueryOptions options) {
- return new Bigquery.Builder(getTransport(), getJsonFactory(),
- chainHttpRequestInitializer(
- options.getGcpCredential(),
- // Do not log 404. It clutters the output and is possibly even required by the caller.
- new RetryHttpRequestInitializer(ImmutableList.of(404))))
- .setApplicationName(options.getAppName())
- .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
- }
-
- /**
- * Returns a Pubsub client builder.
- *
- * <p>Note: this client's endpoint is <b>not</b> modified by the
- * {@link DataflowPipelineDebugOptions#getApiRootUrl()} option.
- */
- public static Pubsub.Builder
- newPubsubClient(DataflowPipelineOptions options) {
- return new Pubsub.Builder(getTransport(), getJsonFactory(),
- chainHttpRequestInitializer(
- options.getGcpCredential(),
- // Do not log 404. It clutters the output and is possibly even required by the caller.
- new RetryHttpRequestInitializer(ImmutableList.of(404))))
- .setRootUrl(options.getPubsubRootUrl())
- .setApplicationName(options.getAppName())
- .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
- }
-
- /**
- * Returns a Google Cloud Dataflow client builder.
- */
- public static Dataflow.Builder newDataflowClient(DataflowPipelineOptions options) {
- String servicePath = options.getDataflowEndpoint();
- ApiComponents components;
- if (servicePath.contains("://")) {
- components = apiComponentsFromUrl(servicePath);
- } else {
- components = new ApiComponents(options.getApiRootUrl(), servicePath);
- }
-
- return new Dataflow.Builder(getTransport(),
- getJsonFactory(),
- chainHttpRequestInitializer(
- options.getGcpCredential(),
- // Do not log 404. It clutters the output and is possibly even required by the caller.
- new RetryHttpRequestInitializer(ImmutableList.of(404))))
- .setApplicationName(options.getAppName())
- .setRootUrl(components.rootUrl)
- .setServicePath(components.servicePath)
- .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
- }
-
- public static Clouddebugger.Builder newClouddebuggerClient(DataflowPipelineOptions options) {
- return new Clouddebugger.Builder(getTransport(),
- getJsonFactory(),
- chainHttpRequestInitializer(options.getGcpCredential(), new RetryHttpRequestInitializer()))
- .setApplicationName(options.getAppName())
- .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
- }
-
- /**
- * Returns a Dataflow client that does not automatically retry failed
- * requests.
- */
- public static Dataflow.Builder
- newRawDataflowClient(DataflowPipelineOptions options) {
- return newDataflowClient(options)
- .setHttpRequestInitializer(options.getGcpCredential())
- .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
- }
-
- /**
- * Returns a Cloud Storage client builder.
- *
- * <p>Note: this client's endpoint is <b>not</b> modified by the
- * {@link DataflowPipelineDebugOptions#getApiRootUrl()} option.
- */
- public static Storage.Builder
- newStorageClient(GcsOptions options) {
- String servicePath = options.getGcsEndpoint();
- Storage.Builder storageBuilder = new Storage.Builder(getTransport(), getJsonFactory(),
- chainHttpRequestInitializer(
- options.getGcpCredential(),
- // Do not log the code 404. Code up the stack will deal with 404's if needed, and
- // logging it by default clutters the output during file staging.
- new RetryHttpRequestInitializer(
- ImmutableList.of(404), new UploadIdResponseInterceptor())))
- .setApplicationName(options.getAppName())
- .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
- if (servicePath != null) {
- ApiComponents components = apiComponentsFromUrl(servicePath);
- storageBuilder.setRootUrl(components.rootUrl);
- storageBuilder.setServicePath(components.servicePath);
- }
- return storageBuilder;
- }
-
- private static HttpRequestInitializer chainHttpRequestInitializer(
- Credential credential, HttpRequestInitializer httpRequestInitializer) {
- if (credential == null) {
- return httpRequestInitializer;
- } else {
- return new ChainingHttpRequestInitializer(credential, httpRequestInitializer);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java
deleted file mode 100644
index 64ff402..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerContextFactory.java
+++ /dev/null
@@ -1,522 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger.MergingTriggerInfo;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger.TriggerInfo;
-import com.google.cloud.dataflow.sdk.util.state.MergingStateAccessor;
-import com.google.cloud.dataflow.sdk.util.state.State;
-import com.google.cloud.dataflow.sdk.util.state.StateAccessor;
-import com.google.cloud.dataflow.sdk.util.state.StateInternals;
-import com.google.cloud.dataflow.sdk.util.state.StateNamespace;
-import com.google.cloud.dataflow.sdk.util.state.StateNamespaces;
-import com.google.cloud.dataflow.sdk.util.state.StateTag;
-import com.google.common.base.Predicate;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-
-import org.joda.time.Instant;
-
-import java.util.Collection;
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-/**
- * Factory for creating instances of the various {@link Trigger} contexts.
- *
- * <p>These contexts are highly interdependent and share many fields; it is inadvisable
- * to create them via any means other than this factory class.
- */
-public class TriggerContextFactory<W extends BoundedWindow> {
-
- private final WindowingStrategy<?, W> windowingStrategy;
- private StateInternals<?> stateInternals;
- // Future triggers may be able to exploit the active window to state address window mapping.
- @SuppressWarnings("unused")
- private ActiveWindowSet<W> activeWindows;
- private final Coder<W> windowCoder;
-
- public TriggerContextFactory(WindowingStrategy<?, W> windowingStrategy,
- StateInternals<?> stateInternals, ActiveWindowSet<W> activeWindows) {
- this.windowingStrategy = windowingStrategy;
- this.stateInternals = stateInternals;
- this.activeWindows = activeWindows;
- this.windowCoder = windowingStrategy.getWindowFn().windowCoder();
- }
-
- public Trigger<W>.TriggerContext base(W window, Timers timers,
- ExecutableTrigger<W> rootTrigger, FinishedTriggers finishedSet) {
- return new TriggerContextImpl(window, timers, rootTrigger, finishedSet);
- }
-
- public Trigger<W>.OnElementContext createOnElementContext(
- W window, Timers timers, Instant elementTimestamp,
- ExecutableTrigger<W> rootTrigger, FinishedTriggers finishedSet) {
- return new OnElementContextImpl(window, timers, rootTrigger, finishedSet, elementTimestamp);
- }
-
- public Trigger<W>.OnMergeContext createOnMergeContext(W window, Timers timers,
- ExecutableTrigger<W> rootTrigger, FinishedTriggers finishedSet,
- Map<W, FinishedTriggers> finishedSets) {
- return new OnMergeContextImpl(window, timers, rootTrigger, finishedSet, finishedSets);
- }
-
- public StateAccessor<?> createStateAccessor(W window, ExecutableTrigger<W> trigger) {
- return new StateAccessorImpl(window, trigger);
- }
-
- public MergingStateAccessor<?, W> createMergingStateAccessor(
- W mergeResult, Collection<W> mergingWindows, ExecutableTrigger<W> trigger) {
- return new MergingStateAccessorImpl(trigger, mergingWindows, mergeResult);
- }
-
- private class TriggerInfoImpl implements Trigger.TriggerInfo<W> {
-
- protected final ExecutableTrigger<W> trigger;
- protected final FinishedTriggers finishedSet;
- private final Trigger<W>.TriggerContext context;
-
- public TriggerInfoImpl(ExecutableTrigger<W> trigger, FinishedTriggers finishedSet,
- Trigger<W>.TriggerContext context) {
- this.trigger = trigger;
- this.finishedSet = finishedSet;
- this.context = context;
- }
-
- @Override
- public boolean isMerging() {
- return !windowingStrategy.getWindowFn().isNonMerging();
- }
-
- @Override
- public Iterable<ExecutableTrigger<W>> subTriggers() {
- return trigger.subTriggers();
- }
-
- @Override
- public ExecutableTrigger<W> subTrigger(int subtriggerIndex) {
- return trigger.subTriggers().get(subtriggerIndex);
- }
-
- @Override
- public boolean isFinished() {
- return finishedSet.isFinished(trigger);
- }
-
- @Override
- public boolean isFinished(int subtriggerIndex) {
- return finishedSet.isFinished(subTrigger(subtriggerIndex));
- }
-
- @Override
- public boolean areAllSubtriggersFinished() {
- return Iterables.isEmpty(unfinishedSubTriggers());
- }
-
- @Override
- public Iterable<ExecutableTrigger<W>> unfinishedSubTriggers() {
- return FluentIterable
- .from(trigger.subTriggers())
- .filter(new Predicate<ExecutableTrigger<W>>() {
- @Override
- public boolean apply(ExecutableTrigger<W> trigger) {
- return !finishedSet.isFinished(trigger);
- }
- });
- }
-
- @Override
- public ExecutableTrigger<W> firstUnfinishedSubTrigger() {
- for (ExecutableTrigger<W> subTrigger : trigger.subTriggers()) {
- if (!finishedSet.isFinished(subTrigger)) {
- return subTrigger;
- }
- }
- return null;
- }
-
- @Override
- public void resetTree() throws Exception {
- finishedSet.clearRecursively(trigger);
- trigger.invokeClear(context);
- }
-
- @Override
- public void setFinished(boolean finished) {
- finishedSet.setFinished(trigger, finished);
- }
-
- @Override
- public void setFinished(boolean finished, int subTriggerIndex) {
- finishedSet.setFinished(subTrigger(subTriggerIndex), finished);
- }
- }
-
- private class TriggerTimers implements Timers {
-
- private final Timers timers;
- private final W window;
-
- public TriggerTimers(W window, Timers timers) {
- this.timers = timers;
- this.window = window;
- }
-
- @Override
- public void setTimer(Instant timestamp, TimeDomain timeDomain) {
- timers.setTimer(timestamp, timeDomain);
- }
-
- @Override
- public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
- if (timeDomain == TimeDomain.EVENT_TIME
- && timestamp.equals(window.maxTimestamp())) {
- // Don't allow triggers to unset the at-max-timestamp timer. This is necessary for on-time
- // state transitions.
- return;
- }
- timers.deleteTimer(timestamp, timeDomain);
- }
-
- @Override
- public Instant currentProcessingTime() {
- return timers.currentProcessingTime();
- }
-
- @Override
- @Nullable
- public Instant currentSynchronizedProcessingTime() {
- return timers.currentSynchronizedProcessingTime();
- }
-
- @Override
- @Nullable
- public Instant currentEventTime() {
- return timers.currentEventTime();
- }
- }
-
- private class MergingTriggerInfoImpl
- extends TriggerInfoImpl implements Trigger.MergingTriggerInfo<W> {
-
- private final Map<W, FinishedTriggers> finishedSets;
-
- public MergingTriggerInfoImpl(
- ExecutableTrigger<W> trigger,
- FinishedTriggers finishedSet,
- Trigger<W>.TriggerContext context,
- Map<W, FinishedTriggers> finishedSets) {
- super(trigger, finishedSet, context);
- this.finishedSets = finishedSets;
- }
-
- @Override
- public boolean finishedInAnyMergingWindow() {
- for (FinishedTriggers finishedSet : finishedSets.values()) {
- if (finishedSet.isFinished(trigger)) {
- return true;
- }
- }
- return false;
- }
-
- @Override
- public boolean finishedInAllMergingWindows() {
- for (FinishedTriggers finishedSet : finishedSets.values()) {
- if (!finishedSet.isFinished(trigger)) {
- return false;
- }
- }
- return true;
- }
-
- @Override
- public Iterable<W> getFinishedMergingWindows() {
- return Maps.filterValues(finishedSets, new Predicate<FinishedTriggers>() {
- @Override
- public boolean apply(FinishedTriggers finishedSet) {
- return finishedSet.isFinished(trigger);
- }
- }).keySet();
- }
- }
-
- private class StateAccessorImpl implements StateAccessor<Object> {
- protected final int triggerIndex;
- protected final StateNamespace windowNamespace;
-
- public StateAccessorImpl(
- W window,
- ExecutableTrigger<W> trigger) {
- this.triggerIndex = trigger.getTriggerIndex();
- this.windowNamespace = namespaceFor(window);
- }
-
- protected StateNamespace namespaceFor(W window) {
- return StateNamespaces.windowAndTrigger(windowCoder, window, triggerIndex);
- }
-
- @Override
- public <StateT extends State> StateT access(StateTag<? super Object, StateT> address) {
- return stateInternals.state(windowNamespace, address);
- }
- }
-
- private class MergingStateAccessorImpl extends StateAccessorImpl
- implements MergingStateAccessor<Object, W> {
- private final Collection<W> activeToBeMerged;
-
- public MergingStateAccessorImpl(ExecutableTrigger<W> trigger, Collection<W> activeToBeMerged,
- W mergeResult) {
- super(mergeResult, trigger);
- this.activeToBeMerged = activeToBeMerged;
- }
-
- @Override
- public <StateT extends State> StateT access(
- StateTag<? super Object, StateT> address) {
- return stateInternals.state(windowNamespace, address);
- }
-
- @Override
- public <StateT extends State> Map<W, StateT> accessInEachMergingWindow(
- StateTag<? super Object, StateT> address) {
- ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder();
- for (W mergingWindow : activeToBeMerged) {
- StateT stateForWindow = stateInternals.state(namespaceFor(mergingWindow), address);
- builder.put(mergingWindow, stateForWindow);
- }
- return builder.build();
- }
- }
-
- private class TriggerContextImpl extends Trigger<W>.TriggerContext {
-
- private final W window;
- private final StateAccessorImpl state;
- private final Timers timers;
- private final TriggerInfoImpl triggerInfo;
-
- private TriggerContextImpl(
- W window,
- Timers timers,
- ExecutableTrigger<W> trigger,
- FinishedTriggers finishedSet) {
- trigger.getSpec().super();
- this.window = window;
- this.state = new StateAccessorImpl(window, trigger);
- this.timers = new TriggerTimers(window, timers);
- this.triggerInfo = new TriggerInfoImpl(trigger, finishedSet, this);
- }
-
- @Override
- public Trigger<W>.TriggerContext forTrigger(ExecutableTrigger<W> trigger) {
- return new TriggerContextImpl(window, timers, trigger, triggerInfo.finishedSet);
- }
-
- @Override
- public TriggerInfo<W> trigger() {
- return triggerInfo;
- }
-
- @Override
- public StateAccessor state() {
- return state;
- }
-
- @Override
- public W window() {
- return window;
- }
-
- @Override
- public void deleteTimer(Instant timestamp, TimeDomain domain) {
- timers.deleteTimer(timestamp, domain);
- }
-
- @Override
- public Instant currentProcessingTime() {
- return timers.currentProcessingTime();
- }
-
- @Override
- @Nullable
- public Instant currentSynchronizedProcessingTime() {
- return timers.currentSynchronizedProcessingTime();
- }
-
- @Override
- @Nullable
- public Instant currentEventTime() {
- return timers.currentEventTime();
- }
- }
-
- private class OnElementContextImpl extends Trigger<W>.OnElementContext {
-
- private final W window;
- private final StateAccessorImpl state;
- private final Timers timers;
- private final TriggerInfoImpl triggerInfo;
- private final Instant eventTimestamp;
-
- private OnElementContextImpl(
- W window,
- Timers timers,
- ExecutableTrigger<W> trigger,
- FinishedTriggers finishedSet,
- Instant eventTimestamp) {
- trigger.getSpec().super();
- this.window = window;
- this.state = new StateAccessorImpl(window, trigger);
- this.timers = new TriggerTimers(window, timers);
- this.triggerInfo = new TriggerInfoImpl(trigger, finishedSet, this);
- this.eventTimestamp = eventTimestamp;
- }
-
-
- @Override
- public Instant eventTimestamp() {
- return eventTimestamp;
- }
-
- @Override
- public Trigger<W>.OnElementContext forTrigger(ExecutableTrigger<W> trigger) {
- return new OnElementContextImpl(
- window, timers, trigger, triggerInfo.finishedSet, eventTimestamp);
- }
-
- @Override
- public TriggerInfo<W> trigger() {
- return triggerInfo;
- }
-
- @Override
- public StateAccessor state() {
- return state;
- }
-
- @Override
- public W window() {
- return window;
- }
-
- @Override
- public void setTimer(Instant timestamp, TimeDomain domain) {
- timers.setTimer(timestamp, domain);
- }
-
-
- @Override
- public void deleteTimer(Instant timestamp, TimeDomain domain) {
- timers.deleteTimer(timestamp, domain);
- }
-
- @Override
- public Instant currentProcessingTime() {
- return timers.currentProcessingTime();
- }
-
- @Override
- @Nullable
- public Instant currentSynchronizedProcessingTime() {
- return timers.currentSynchronizedProcessingTime();
- }
-
- @Override
- @Nullable
- public Instant currentEventTime() {
- return timers.currentEventTime();
- }
- }
-
- private class OnMergeContextImpl extends Trigger<W>.OnMergeContext {
- private final MergingStateAccessor<?, W> state;
- private final W window;
- private final Collection<W> mergingWindows;
- private final Timers timers;
- private final MergingTriggerInfoImpl triggerInfo;
-
- private OnMergeContextImpl(
- W window,
- Timers timers,
- ExecutableTrigger<W> trigger,
- FinishedTriggers finishedSet,
- Map<W, FinishedTriggers> finishedSets) {
- trigger.getSpec().super();
- this.mergingWindows = finishedSets.keySet();
- this.window = window;
- this.state = new MergingStateAccessorImpl(trigger, mergingWindows, window);
- this.timers = new TriggerTimers(window, timers);
- this.triggerInfo = new MergingTriggerInfoImpl(trigger, finishedSet, this, finishedSets);
- }
-
- @Override
- public Trigger<W>.OnMergeContext forTrigger(ExecutableTrigger<W> trigger) {
- return new OnMergeContextImpl(
- window, timers, trigger, triggerInfo.finishedSet, triggerInfo.finishedSets);
- }
-
- @Override
- public MergingStateAccessor<?, W> state() {
- return state;
- }
-
- @Override
- public MergingTriggerInfo<W> trigger() {
- return triggerInfo;
- }
-
- @Override
- public W window() {
- return window;
- }
-
- @Override
- public void setTimer(Instant timestamp, TimeDomain domain) {
- timers.setTimer(timestamp, domain);
- }
-
- @Override
- public void deleteTimer(Instant timestamp, TimeDomain domain) {
- timers.setTimer(timestamp, domain);
-
- }
-
- @Override
- public Instant currentProcessingTime() {
- return timers.currentProcessingTime();
- }
-
- @Override
- @Nullable
- public Instant currentSynchronizedProcessingTime() {
- return timers.currentSynchronizedProcessingTime();
- }
-
- @Override
- @Nullable
- public Instant currentEventTime() {
- return timers.currentEventTime();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java
deleted file mode 100644
index dcfd035..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.DefaultTrigger;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger;
-import com.google.cloud.dataflow.sdk.util.state.MergingStateAccessor;
-import com.google.cloud.dataflow.sdk.util.state.StateAccessor;
-import com.google.cloud.dataflow.sdk.util.state.StateTag;
-import com.google.cloud.dataflow.sdk.util.state.StateTags;
-import com.google.cloud.dataflow.sdk.util.state.ValueState;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-
-import org.joda.time.Instant;
-
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * Executes a trigger while managing persistence of information about which subtriggers are
- * finished. Subtriggers include all recursive trigger expressions as well as the entire trigger.
- *
- * <p>Specifically, the responsibilities are:
- *
- * <ul>
- * <li>Invoking the trigger's methods via its {@link ExecutableTrigger} wrapper by
- * constructing the appropriate trigger contexts.</li>
- * <li>Committing a record of which subtriggers are finished to persistent state.</li>
- * <li>Restoring the record of which subtriggers are finished from persistent state.</li>
- * <li>Clearing out the persisted finished set when a caller indicates
- * (via {#link #clearFinished}) that it is no longer needed.</li>
- * </ul>
- *
- * <p>These responsibilities are intertwined: trigger contexts include mutable information about
- * which subtriggers are finished. This class provides the information when building the contexts
- * and commits the information when the method of the {@link ExecutableTrigger} returns.
- *
- * @param <W> The kind of windows being processed.
- */
-public class TriggerRunner<W extends BoundedWindow> {
- @VisibleForTesting
- static final StateTag<Object, ValueState<BitSet>> FINISHED_BITS_TAG =
- StateTags.makeSystemTagInternal(StateTags.value("closed", BitSetCoder.of()));
-
- private final ExecutableTrigger<W> rootTrigger;
- private final TriggerContextFactory<W> contextFactory;
-
- public TriggerRunner(ExecutableTrigger<W> rootTrigger, TriggerContextFactory<W> contextFactory) {
- Preconditions.checkState(rootTrigger.getTriggerIndex() == 0);
- this.rootTrigger = rootTrigger;
- this.contextFactory = contextFactory;
- }
-
- private FinishedTriggersBitSet readFinishedBits(ValueState<BitSet> state) {
- if (!isFinishedSetNeeded()) {
- // If no trigger in the tree will ever have finished bits, then we don't need to read them.
- // So that the code can be agnostic to that fact, we create a BitSet that is all 0 (not
- // finished) for each trigger in the tree.
- return FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree());
- }
-
- BitSet bitSet = state.read();
- return bitSet == null
- ? FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree())
- : FinishedTriggersBitSet.fromBitSet(bitSet);
- }
-
- /** Return true if the trigger is closed in the window corresponding to the specified state. */
- public boolean isClosed(StateAccessor<?> state) {
- return readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger);
- }
-
- public void prefetchForValue(W window, StateAccessor<?> state) {
- if (isFinishedSetNeeded()) {
- state.access(FINISHED_BITS_TAG).readLater();
- }
- rootTrigger.getSpec().prefetchOnElement(
- contextFactory.createStateAccessor(window, rootTrigger));
- }
-
- public void prefetchOnFire(W window, StateAccessor<?> state) {
- if (isFinishedSetNeeded()) {
- state.access(FINISHED_BITS_TAG).readLater();
- }
- rootTrigger.getSpec().prefetchOnFire(contextFactory.createStateAccessor(window, rootTrigger));
- }
-
- public void prefetchShouldFire(W window, StateAccessor<?> state) {
- if (isFinishedSetNeeded()) {
- state.access(FINISHED_BITS_TAG).readLater();
- }
- rootTrigger.getSpec().prefetchShouldFire(
- contextFactory.createStateAccessor(window, rootTrigger));
- }
-
- /**
- * Run the trigger logic to deal with a new value.
- */
- public void processValue(W window, Instant timestamp, Timers timers, StateAccessor<?> state)
- throws Exception {
- // Clone so that we can detect changes and so that changes here don't pollute merging.
- FinishedTriggersBitSet finishedSet =
- readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
- Trigger<W>.OnElementContext triggerContext = contextFactory.createOnElementContext(
- window, timers, timestamp, rootTrigger, finishedSet);
- rootTrigger.invokeOnElement(triggerContext);
- persistFinishedSet(state, finishedSet);
- }
-
- public void prefetchForMerge(
- W window, Collection<W> mergingWindows, MergingStateAccessor<?, W> state) {
- if (isFinishedSetNeeded()) {
- for (ValueState<?> value : state.accessInEachMergingWindow(FINISHED_BITS_TAG).values()) {
- value.readLater();
- }
- }
- rootTrigger.getSpec().prefetchOnMerge(contextFactory.createMergingStateAccessor(
- window, mergingWindows, rootTrigger));
- }
-
- /**
- * Run the trigger merging logic as part of executing the specified merge.
- */
- public void onMerge(W window, Timers timers, MergingStateAccessor<?, W> state) throws Exception {
- // Clone so that we can detect changes and so that changes here don't pollute merging.
- FinishedTriggersBitSet finishedSet =
- readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
-
- // And read the finished bits in each merging window.
- ImmutableMap.Builder<W, FinishedTriggers> builder = ImmutableMap.builder();
- for (Map.Entry<W, ValueState<BitSet>> entry :
- state.accessInEachMergingWindow(FINISHED_BITS_TAG).entrySet()) {
- // Don't need to clone these, since the trigger context doesn't allow modification
- builder.put(entry.getKey(), readFinishedBits(entry.getValue()));
- }
- ImmutableMap<W, FinishedTriggers> mergingFinishedSets = builder.build();
-
- Trigger<W>.OnMergeContext mergeContext = contextFactory.createOnMergeContext(
- window, timers, rootTrigger, finishedSet, mergingFinishedSets);
-
- // Run the merge from the trigger
- rootTrigger.invokeOnMerge(mergeContext);
-
- persistFinishedSet(state, finishedSet);
-
- // Clear the finished bits.
- clearFinished(state);
- }
-
- public boolean shouldFire(W window, Timers timers, StateAccessor<?> state) throws Exception {
- FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
- Trigger<W>.TriggerContext context = contextFactory.base(window, timers,
- rootTrigger, finishedSet);
- return rootTrigger.invokeShouldFire(context);
- }
-
- public void onFire(W window, Timers timers, StateAccessor<?> state) throws Exception {
- FinishedTriggersBitSet finishedSet =
- readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
- Trigger<W>.TriggerContext context = contextFactory.base(window, timers,
- rootTrigger, finishedSet);
- rootTrigger.invokeOnFire(context);
- persistFinishedSet(state, finishedSet);
- }
-
- private void persistFinishedSet(
- StateAccessor<?> state, FinishedTriggersBitSet modifiedFinishedSet) {
- if (!isFinishedSetNeeded()) {
- return;
- }
-
- ValueState<BitSet> finishedSetState = state.access(FINISHED_BITS_TAG);
- if (!readFinishedBits(finishedSetState).equals(modifiedFinishedSet)) {
- if (modifiedFinishedSet.getBitSet().isEmpty()) {
- finishedSetState.clear();
- } else {
- finishedSetState.write(modifiedFinishedSet.getBitSet());
- }
- }
- }
-
- /**
- * Clear finished bits.
- */
- public void clearFinished(StateAccessor<?> state) {
- if (isFinishedSetNeeded()) {
- state.access(FINISHED_BITS_TAG).clear();
- }
- }
-
- /**
- * Clear the state used for executing triggers, but leave the finished set to indicate
- * the window is closed.
- */
- public void clearState(W window, Timers timers, StateAccessor<?> state) throws Exception {
- // Don't need to clone, because we'll be clearing the finished bits anyways.
- FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG));
- rootTrigger.invokeClear(contextFactory.base(window, timers, rootTrigger, finishedSet));
- }
-
- private boolean isFinishedSetNeeded() {
- // TODO: If we know that no trigger in the tree will ever finish, we don't need to do the
- // lookup. Right now, we special case this for the DefaultTrigger.
- return !(rootTrigger.getSpec() instanceof DefaultTrigger);
- }
-}