You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2018/01/10 18:34:15 UTC
[beam] branch master updated: [BEAM-2863] Create an
EncodedBoundedWindow allowing runners to pass around an encoded window
instead of decoding each window.
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 86e824f [BEAM-2863] Create an EncodedBoundedWindow allowing runners to pass around an encoded window instead of decoding each window.
86e824f is described below
commit 86e824f0e97376f72df8ed5054a35fb7ed0650e3
Author: Luke Cwik <lc...@google.com>
AuthorDate: Mon Jan 8 16:17:32 2018 -0800
[BEAM-2863] Create an EncodedBoundedWindow allowing runners to pass around an encoded window instead of decoding each window.
---
.../sdk/fn/windowing/EncodedBoundedWindow.java | 94 ++++++++++++++++++++++
.../apache/beam/sdk/fn/windowing/package-info.java | 22 +++++
.../sdk/fn/windowing/EncodedBoundedWindowTest.java | 46 +++++++++++
3 files changed, 162 insertions(+)
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/windowing/EncodedBoundedWindow.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/windowing/EncodedBoundedWindow.java
new file mode 100644
index 0000000..97659e3
--- /dev/null
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/windowing/EncodedBoundedWindow.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.fn.windowing;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.io.ByteStreams;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.VarInt;
+import org.joda.time.Instant;
+
+/**
+ * An encoded {@link BoundedWindow} used within Runners to track window information without
+ * needing to decode the window.
+ *
+ * <p>This allows for Runners to not need to know window format during execution.
+ */
+@AutoValue
+public abstract class EncodedBoundedWindow extends BoundedWindow {
+ public static EncodedBoundedWindow forEncoding(ByteString encodedWindow) {
+ return new AutoValue_EncodedBoundedWindow(encodedWindow);
+ }
+
+ public abstract ByteString getEncodedWindow();
+
+ @Override
+ public Instant maxTimestamp() {
+ throw new UnsupportedOperationException("TODO: Add support for reading the timestamp from "
+ + "the encoded window.");
+ }
+
+ /**
+ * An {@link Coder} for {@link EncodedBoundedWindow}s.
+ *
+ * <p>This is a copy of {@code ByteStringCoder} to prevent a dependency on
+ * {@code beam-java-sdk-extensions-protobuf}.
+ */
+ public static class Coder extends AtomicCoder<EncodedBoundedWindow> {
+ public static final Coder INSTANCE = new Coder();
+
+ // prevent instantiation
+ private Coder() { }
+
+ @Override
+ public void encode(EncodedBoundedWindow value, OutputStream outStream)
+ throws CoderException, IOException {
+ VarInt.encode(value.getEncodedWindow().size(), outStream);
+ value.getEncodedWindow().writeTo(outStream);
+ }
+
+ @Override
+ public EncodedBoundedWindow decode(InputStream inStream) throws CoderException, IOException {
+ int size = VarInt.decodeInt(inStream);
+ ByteString encodedWindow = ByteString.readFrom(ByteStreams.limit(inStream, size));
+ return EncodedBoundedWindow.forEncoding(encodedWindow);
+ }
+
+ @Override
+ public boolean consistentWithEquals() {
+ return true;
+ }
+
+ @Override
+ public boolean isRegisterByteSizeObserverCheap(EncodedBoundedWindow value) {
+ return true;
+ }
+
+ @Override
+ protected long getEncodedElementByteSize(EncodedBoundedWindow value) throws Exception {
+ return VarInt.getLength(value.getEncodedWindow().size()) + value.getEncodedWindow().size();
+ }
+ }
+}
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/windowing/package-info.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/windowing/package-info.java
new file mode 100644
index 0000000..9a91836
--- /dev/null
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/windowing/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Common utilities related to windowing during execution of a pipeline.
+ */
+package org.apache.beam.sdk.fn.windowing;
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/windowing/EncodedBoundedWindowTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/windowing/EncodedBoundedWindowTest.java
new file mode 100644
index 0000000..c3e4b11
--- /dev/null
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/windowing/EncodedBoundedWindowTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.fn.windowing;
+
+import com.google.protobuf.ByteString;
+import org.apache.beam.sdk.fn.windowing.EncodedBoundedWindow.Coder;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link EncodedBoundedWindow}. */
+@RunWith(JUnit4.class)
+public class EncodedBoundedWindowTest {
+ @Test
+ public void testCoder() throws Exception {
+ CoderProperties.coderSerializable(Coder.INSTANCE);
+ CoderProperties.coderConsistentWithEquals(Coder.INSTANCE,
+ EncodedBoundedWindow.forEncoding(ByteString.copyFrom(new byte[] {0x01, 0x02, 0x03})),
+ EncodedBoundedWindow.forEncoding(ByteString.copyFrom(new byte[] {0x01, 0x02, 0x03})));
+ CoderProperties.coderDecodeEncodeEqual(Coder.INSTANCE,
+ EncodedBoundedWindow.forEncoding(ByteString.copyFrom(new byte[] {0x01, 0x02, 0x03})));
+ CoderProperties.coderDeterministic(
+ Coder.INSTANCE,
+ EncodedBoundedWindow.forEncoding(ByteString.copyFrom(new byte[] {0x01, 0x02, 0x03})),
+ EncodedBoundedWindow.forEncoding(ByteString.copyFrom(new byte[] {0x01, 0x02, 0x03})));
+ CoderProperties.structuralValueDecodeEncodeEqual(Coder.INSTANCE,
+ EncodedBoundedWindow.forEncoding(ByteString.copyFrom(new byte[] {0x01, 0x02, 0x03})));
+ }
+}
--
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <co...@beam.apache.org>'].