You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2018/01/10 18:34:15 UTC

[beam] branch master updated: [BEAM-2863] Create an EncodedBoundedWindow allowing runners to pass around an encoded window instead of decoding each window.

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 86e824f  [BEAM-2863] Create an EncodedBoundedWindow allowing runners to pass around an encoded window instead of decoding each window.
86e824f is described below

commit 86e824f0e97376f72df8ed5054a35fb7ed0650e3
Author: Luke Cwik <lc...@google.com>
AuthorDate: Mon Jan 8 16:17:32 2018 -0800

    [BEAM-2863] Create an EncodedBoundedWindow allowing runners to pass around an encoded window instead of decoding each window.
---
 .../sdk/fn/windowing/EncodedBoundedWindow.java     | 94 ++++++++++++++++++++++
 .../apache/beam/sdk/fn/windowing/package-info.java | 22 +++++
 .../sdk/fn/windowing/EncodedBoundedWindowTest.java | 46 +++++++++++
 3 files changed, 162 insertions(+)

diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/windowing/EncodedBoundedWindow.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/windowing/EncodedBoundedWindow.java
new file mode 100644
index 0000000..97659e3
--- /dev/null
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/windowing/EncodedBoundedWindow.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.fn.windowing;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.io.ByteStreams;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.VarInt;
+import org.joda.time.Instant;
+
+/**
+ * An encoded {@link BoundedWindow} used within Runners to track window information without
+ * needing to decode the window.
+ *
+ * <p>This allows for Runners to not need to know window format during execution.
+ */
+@AutoValue
+public abstract class EncodedBoundedWindow extends BoundedWindow {
+  public static EncodedBoundedWindow forEncoding(ByteString encodedWindow) {
+    return new AutoValue_EncodedBoundedWindow(encodedWindow);
+  }
+
+  public abstract ByteString getEncodedWindow();
+
+  @Override
+  public Instant maxTimestamp() {
+    throw new UnsupportedOperationException("TODO: Add support for reading the timestamp from "
+        + "the encoded window.");
+  }
+
+  /**
+   * An {@link Coder} for {@link EncodedBoundedWindow}s.
+   *
+   * <p>This is a copy of {@code ByteStringCoder} to prevent a dependency on
+   * {@code beam-java-sdk-extensions-protobuf}.
+   */
+  public static class Coder extends AtomicCoder<EncodedBoundedWindow> {
+    public static final Coder INSTANCE = new Coder();
+
+    // prevent instantiation
+    private Coder() { }
+
+    @Override
+    public void encode(EncodedBoundedWindow value, OutputStream outStream)
+        throws CoderException, IOException {
+      VarInt.encode(value.getEncodedWindow().size(), outStream);
+      value.getEncodedWindow().writeTo(outStream);
+    }
+
+    @Override
+    public EncodedBoundedWindow decode(InputStream inStream) throws CoderException, IOException {
+      int size = VarInt.decodeInt(inStream);
+      ByteString encodedWindow = ByteString.readFrom(ByteStreams.limit(inStream, size));
+      return EncodedBoundedWindow.forEncoding(encodedWindow);
+    }
+
+    @Override
+    public boolean consistentWithEquals() {
+      return true;
+    }
+
+    @Override
+    public boolean isRegisterByteSizeObserverCheap(EncodedBoundedWindow value) {
+      return true;
+    }
+
+    @Override
+    protected long getEncodedElementByteSize(EncodedBoundedWindow value) throws Exception {
+      return VarInt.getLength(value.getEncodedWindow().size()) + value.getEncodedWindow().size();
+    }
+  }
+}
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/windowing/package-info.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/windowing/package-info.java
new file mode 100644
index 0000000..9a91836
--- /dev/null
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/windowing/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Common utilities related to windowing during execution of a pipeline.
+ */
+package org.apache.beam.sdk.fn.windowing;
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/windowing/EncodedBoundedWindowTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/windowing/EncodedBoundedWindowTest.java
new file mode 100644
index 0000000..c3e4b11
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/windowing/EncodedBoundedWindowTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.fn.windowing;
+
+import com.google.protobuf.ByteString;
+import org.apache.beam.sdk.fn.windowing.EncodedBoundedWindow.Coder;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link EncodedBoundedWindow}. */
+@RunWith(JUnit4.class)
+public class EncodedBoundedWindowTest {
+  @Test
+  public void testCoder() throws Exception {
+    CoderProperties.coderSerializable(Coder.INSTANCE);
+    CoderProperties.coderConsistentWithEquals(Coder.INSTANCE,
+        EncodedBoundedWindow.forEncoding(ByteString.copyFrom(new byte[] {0x01, 0x02, 0x03})),
+        EncodedBoundedWindow.forEncoding(ByteString.copyFrom(new byte[] {0x01, 0x02, 0x03})));
+    CoderProperties.coderDecodeEncodeEqual(Coder.INSTANCE,
+        EncodedBoundedWindow.forEncoding(ByteString.copyFrom(new byte[] {0x01, 0x02, 0x03})));
+    CoderProperties.coderDeterministic(
+        Coder.INSTANCE,
+        EncodedBoundedWindow.forEncoding(ByteString.copyFrom(new byte[] {0x01, 0x02, 0x03})),
+        EncodedBoundedWindow.forEncoding(ByteString.copyFrom(new byte[] {0x01, 0x02, 0x03})));
+    CoderProperties.structuralValueDecodeEncodeEqual(Coder.INSTANCE,
+        EncodedBoundedWindow.forEncoding(ByteString.copyFrom(new byte[] {0x01, 0x02, 0x03})));
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <co...@beam.apache.org>'].