You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/04 23:44:17 UTC
[3/4] beam git commit: Adds coders for boolean,
ResourceId and Metadata
Adds coders for boolean, ResourceId and Metadata
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5e43b238
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5e43b238
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5e43b238
Branch: refs/heads/master
Commit: 5e43b2388652f38a37ab3378a63ae88e6ad53ee3
Parents: 8d337ff
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Aug 3 14:42:07 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Aug 4 16:38:23 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/coders/BooleanCoder.java | 59 ++++++++++++++++++
.../apache/beam/sdk/coders/CoderRegistry.java | 10 ++++
.../apache/beam/sdk/io/fs/MetadataCoder.java | 63 ++++++++++++++++++++
.../apache/beam/sdk/io/fs/ResourceIdCoder.java | 56 +++++++++++++++++
.../org/apache/beam/sdk/transforms/Watch.java | 7 ++-
5 files changed, 192 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/5e43b238/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BooleanCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BooleanCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BooleanCoder.java
new file mode 100644
index 0000000..e7f7543
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BooleanCoder.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.coders;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/** A {@link Coder} for {@link Boolean}. */
+public class BooleanCoder extends AtomicCoder<Boolean> {
+ private static final ByteCoder BYTE_CODER = ByteCoder.of();
+
+ private static final BooleanCoder INSTANCE = new BooleanCoder();
+
+ /** Returns the singleton instance of {@link BooleanCoder}. */
+ public static BooleanCoder of() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void encode(Boolean value, OutputStream os) throws IOException {
+ BYTE_CODER.encode(value ? (byte) 1 : 0, os);
+ }
+
+ @Override
+ public Boolean decode(InputStream is) throws IOException {
+ return BYTE_CODER.decode(is) == 1;
+ }
+
+ @Override
+ public boolean consistentWithEquals() {
+ return true;
+ }
+
+ @Override
+ public boolean isRegisterByteSizeObserverCheap(Boolean value) {
+ return true;
+ }
+
+ @Override
+ protected long getEncodedElementByteSize(Boolean value) throws Exception {
+ return 1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e43b238/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
index 48389b1..c335bda 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
@@ -43,6 +43,10 @@ import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.CannotProvideCoderException.ReasonCode;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.MetadataCoder;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.io.fs.ResourceIdCoder;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.util.CoderUtils;
@@ -89,6 +93,8 @@ public class CoderRegistry {
private CommonTypes() {
ImmutableMap.Builder<Class<?>, CoderProvider> builder = ImmutableMap.builder();
+ builder.put(Boolean.class,
+ CoderProviders.fromStaticMethods(Boolean.class, BooleanCoder.class));
builder.put(Byte.class,
CoderProviders.fromStaticMethods(Byte.class, ByteCoder.class));
builder.put(BitSet.class,
@@ -109,6 +115,10 @@ public class CoderRegistry {
CoderProviders.fromStaticMethods(Long.class, VarLongCoder.class));
builder.put(Map.class,
CoderProviders.fromStaticMethods(Map.class, MapCoder.class));
+ builder.put(Metadata.class,
+ CoderProviders.fromStaticMethods(Metadata.class, MetadataCoder.class));
+ builder.put(ResourceId.class,
+ CoderProviders.fromStaticMethods(ResourceId.class, ResourceIdCoder.class));
builder.put(Set.class,
CoderProviders.fromStaticMethods(Set.class, SetCoder.class));
builder.put(String.class,
http://git-wip-us.apache.org/repos/asf/beam/blob/5e43b238/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java
new file mode 100644
index 0000000..5c9c4d7
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fs;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+
+/** A {@link Coder} for {@link Metadata}. */
+public class MetadataCoder extends AtomicCoder<Metadata> {
+ private static final ResourceIdCoder RESOURCE_ID_CODER = ResourceIdCoder.of();
+ private static final VarIntCoder INT_CODER = VarIntCoder.of();
+ private static final VarLongCoder LONG_CODER = VarLongCoder.of();
+
+ /** Creates a {@link MetadataCoder}. */
+ public static MetadataCoder of() {
+ return new MetadataCoder();
+ }
+
+ @Override
+ public void encode(Metadata value, OutputStream os) throws IOException {
+ RESOURCE_ID_CODER.encode(value.resourceId(), os);
+ INT_CODER.encode(value.isReadSeekEfficient() ? 1 : 0, os);
+ LONG_CODER.encode(value.sizeBytes(), os);
+ }
+
+ @Override
+ public Metadata decode(InputStream is) throws IOException {
+ ResourceId resourceId = RESOURCE_ID_CODER.decode(is);
+ boolean isReadSeekEfficient = INT_CODER.decode(is) == 1;
+ long sizeBytes = LONG_CODER.decode(is);
+ return Metadata.builder()
+ .setResourceId(resourceId)
+ .setIsReadSeekEfficient(isReadSeekEfficient)
+ .setSizeBytes(sizeBytes)
+ .build();
+ }
+
+ @Override
+ public boolean consistentWithEquals() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e43b238/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceIdCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceIdCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceIdCoder.java
new file mode 100644
index 0000000..d7649c0
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceIdCoder.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fs;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.BooleanCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.FileSystems;
+
+/** A {@link Coder} for {@link ResourceId}. */
+public class ResourceIdCoder extends AtomicCoder<ResourceId> {
+ private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+ private static final Coder<Boolean> BOOL_CODER = BooleanCoder.of();
+
+ /** Creates a {@link ResourceIdCoder}. */
+ public static ResourceIdCoder of() {
+ return new ResourceIdCoder();
+ }
+
+ @Override
+ public void encode(ResourceId value, OutputStream os) throws IOException {
+ STRING_CODER.encode(value.toString(), os);
+ BOOL_CODER.encode(value.isDirectory(), os);
+ }
+
+ @Override
+ public ResourceId decode(InputStream is) throws IOException {
+ String spec = STRING_CODER.decode(is);
+ boolean isDirectory = BOOL_CODER.decode(is);
+ return FileSystems.matchNewResource(spec, isDirectory);
+ }
+
+ @Override
+ public boolean consistentWithEquals() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e43b238/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
index b21eb62..fc6f18d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
@@ -47,6 +47,7 @@ import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.BooleanCoder;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DurationCoder;
@@ -958,7 +959,7 @@ public class Watch {
return new GrowthStateCoder<>(outputCoder, terminationStateCoder);
}
- private static final Coder<Integer> INT_CODER = VarIntCoder.of();
+ private static final Coder<Boolean> BOOLEAN_CODER = BooleanCoder.of();
private static final Coder<Instant> INSTANT_CODER = NullableCoder.of(InstantCoder.of());
private static final Coder<HashCode> HASH_CODE_CODER = HashCode128Coder.of();
@@ -980,7 +981,7 @@ public class Watch {
throws IOException {
completedCoder.encode(value.completed, os);
pendingCoder.encode(value.pending, os);
- INT_CODER.encode(value.isOutputComplete ? 1 : 0, os);
+ BOOLEAN_CODER.encode(value.isOutputComplete, os);
terminationStateCoder.encode(value.terminationState, os);
INSTANT_CODER.encode(value.pollWatermark, os);
}
@@ -989,7 +990,7 @@ public class Watch {
public GrowthState<OutputT, TerminationStateT> decode(InputStream is) throws IOException {
Map<HashCode, Instant> completed = completedCoder.decode(is);
List<TimestampedValue<OutputT>> pending = pendingCoder.decode(is);
- boolean isOutputComplete = (INT_CODER.decode(is) == 1);
+ boolean isOutputComplete = BOOLEAN_CODER.decode(is);
TerminationStateT terminationState = terminationStateCoder.decode(is);
Instant pollWatermark = INSTANT_CODER.decode(is);
return new GrowthState<>(