You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/04 23:44:17 UTC

[3/4] beam git commit: Adds coders for boolean, ResourceId and Metadata

Adds coders for boolean, ResourceId and Metadata


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5e43b238
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5e43b238
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5e43b238

Branch: refs/heads/master
Commit: 5e43b2388652f38a37ab3378a63ae88e6ad53ee3
Parents: 8d337ff
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Aug 3 14:42:07 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Aug 4 16:38:23 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/coders/BooleanCoder.java    | 59 ++++++++++++++++++
 .../apache/beam/sdk/coders/CoderRegistry.java   | 10 ++++
 .../apache/beam/sdk/io/fs/MetadataCoder.java    | 63 ++++++++++++++++++++
 .../apache/beam/sdk/io/fs/ResourceIdCoder.java  | 56 +++++++++++++++++
 .../org/apache/beam/sdk/transforms/Watch.java   |  7 ++-
 5 files changed, 192 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5e43b238/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BooleanCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BooleanCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BooleanCoder.java
new file mode 100644
index 0000000..e7f7543
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BooleanCoder.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.coders;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/** A {@link Coder} for {@link Boolean}. */
+public class BooleanCoder extends AtomicCoder<Boolean> {
+  private static final ByteCoder BYTE_CODER = ByteCoder.of();
+
+  private static final BooleanCoder INSTANCE = new BooleanCoder();
+
+  /** Returns the singleton instance of {@link BooleanCoder}. */
+  public static BooleanCoder of() {
+    return INSTANCE;
+  }
+
+  @Override
+  public void encode(Boolean value, OutputStream os) throws IOException {
+    BYTE_CODER.encode(value ? (byte) 1 : 0, os);
+  }
+
+  @Override
+  public Boolean decode(InputStream is) throws IOException {
+    return BYTE_CODER.decode(is) == 1;
+  }
+
+  @Override
+  public boolean consistentWithEquals() {
+    return true;
+  }
+
+  @Override
+  public boolean isRegisterByteSizeObserverCheap(Boolean value) {
+    return true;
+  }
+
+  @Override
+  protected long getEncodedElementByteSize(Boolean value) throws Exception {
+    return 1;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5e43b238/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
index 48389b1..c335bda 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
@@ -43,6 +43,10 @@ import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.CannotProvideCoderException.ReasonCode;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.MetadataCoder;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.io.fs.ResourceIdCoder;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.util.CoderUtils;
@@ -89,6 +93,8 @@ public class CoderRegistry {
 
     private CommonTypes() {
       ImmutableMap.Builder<Class<?>, CoderProvider> builder = ImmutableMap.builder();
+      builder.put(Boolean.class,
+          CoderProviders.fromStaticMethods(Boolean.class, BooleanCoder.class));
       builder.put(Byte.class,
           CoderProviders.fromStaticMethods(Byte.class, ByteCoder.class));
       builder.put(BitSet.class,
@@ -109,6 +115,10 @@ public class CoderRegistry {
           CoderProviders.fromStaticMethods(Long.class, VarLongCoder.class));
       builder.put(Map.class,
           CoderProviders.fromStaticMethods(Map.class, MapCoder.class));
+      builder.put(Metadata.class,
+          CoderProviders.fromStaticMethods(Metadata.class, MetadataCoder.class));
+      builder.put(ResourceId.class,
+          CoderProviders.fromStaticMethods(ResourceId.class, ResourceIdCoder.class));
       builder.put(Set.class,
           CoderProviders.fromStaticMethods(Set.class, SetCoder.class));
       builder.put(String.class,

http://git-wip-us.apache.org/repos/asf/beam/blob/5e43b238/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java
new file mode 100644
index 0000000..5c9c4d7
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fs;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+
+/** A {@link Coder} for {@link Metadata}. */
+public class MetadataCoder extends AtomicCoder<Metadata> {
+  private static final ResourceIdCoder RESOURCE_ID_CODER = ResourceIdCoder.of();
+  private static final VarIntCoder INT_CODER = VarIntCoder.of();
+  private static final VarLongCoder LONG_CODER = VarLongCoder.of();
+
+  /** Creates a {@link MetadataCoder}. */
+  public static MetadataCoder of() {
+    return new MetadataCoder();
+  }
+
+  @Override
+  public void encode(Metadata value, OutputStream os) throws IOException {
+    RESOURCE_ID_CODER.encode(value.resourceId(), os);
+    INT_CODER.encode(value.isReadSeekEfficient() ? 1 : 0, os);
+    LONG_CODER.encode(value.sizeBytes(), os);
+  }
+
+  @Override
+  public Metadata decode(InputStream is) throws IOException {
+    ResourceId resourceId = RESOURCE_ID_CODER.decode(is);
+    boolean isReadSeekEfficient = INT_CODER.decode(is) == 1;
+    long sizeBytes = LONG_CODER.decode(is);
+    return Metadata.builder()
+        .setResourceId(resourceId)
+        .setIsReadSeekEfficient(isReadSeekEfficient)
+        .setSizeBytes(sizeBytes)
+        .build();
+  }
+
+  @Override
+  public boolean consistentWithEquals() {
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5e43b238/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceIdCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceIdCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceIdCoder.java
new file mode 100644
index 0000000..d7649c0
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceIdCoder.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fs;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.BooleanCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.FileSystems;
+
+/** A {@link Coder} for {@link ResourceId}. */
+public class ResourceIdCoder extends AtomicCoder<ResourceId> {
+  private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+  private static final Coder<Boolean> BOOL_CODER = BooleanCoder.of();
+
+  /** Creates a {@link ResourceIdCoder}. */
+  public static ResourceIdCoder of() {
+    return new ResourceIdCoder();
+  }
+
+  @Override
+  public void encode(ResourceId value, OutputStream os) throws IOException {
+    STRING_CODER.encode(value.toString(), os);
+    BOOL_CODER.encode(value.isDirectory(), os);
+  }
+
+  @Override
+  public ResourceId decode(InputStream is) throws IOException {
+    String spec = STRING_CODER.decode(is);
+    boolean isDirectory = BOOL_CODER.decode(is);
+    return FileSystems.matchNewResource(spec, isDirectory);
+  }
+
+  @Override
+  public boolean consistentWithEquals() {
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5e43b238/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
index b21eb62..fc6f18d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
@@ -47,6 +47,7 @@ import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.BooleanCoder;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.DurationCoder;
@@ -958,7 +959,7 @@ public class Watch {
       return new GrowthStateCoder<>(outputCoder, terminationStateCoder);
     }
 
-    private static final Coder<Integer> INT_CODER = VarIntCoder.of();
+    private static final Coder<Boolean> BOOLEAN_CODER = BooleanCoder.of();
     private static final Coder<Instant> INSTANT_CODER = NullableCoder.of(InstantCoder.of());
     private static final Coder<HashCode> HASH_CODE_CODER = HashCode128Coder.of();
 
@@ -980,7 +981,7 @@ public class Watch {
         throws IOException {
       completedCoder.encode(value.completed, os);
       pendingCoder.encode(value.pending, os);
-      INT_CODER.encode(value.isOutputComplete ? 1 : 0, os);
+      BOOLEAN_CODER.encode(value.isOutputComplete, os);
       terminationStateCoder.encode(value.terminationState, os);
       INSTANT_CODER.encode(value.pollWatermark, os);
     }
@@ -989,7 +990,7 @@ public class Watch {
     public GrowthState<OutputT, TerminationStateT> decode(InputStream is) throws IOException {
       Map<HashCode, Instant> completed = completedCoder.decode(is);
       List<TimestampedValue<OutputT>> pending = pendingCoder.decode(is);
-      boolean isOutputComplete = (INT_CODER.decode(is) == 1);
+      boolean isOutputComplete = BOOLEAN_CODER.decode(is);
       TerminationStateT terminationState = terminationStateCoder.decode(is);
       Instant pollWatermark = INSTANT_CODER.decode(is);
       return new GrowthState<>(