You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/10/27 17:28:35 UTC

[47/50] [abbrv] incubator-beam git commit: [BEAM-813] support metadata in Avro sink

[BEAM-813] support metadata in Avro sink


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/eba099f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/eba099f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/eba099f5

Branch: refs/heads/python-sdk
Commit: eba099f564dba3dfbba30ae3533496b9e14f57a7
Parents: 25102f7
Author: Neville Li <ne...@spotify.com>
Authored: Mon Oct 24 18:56:36 2016 -0400
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Oct 26 15:30:50 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 143 ++++++++++++++++---
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  29 ++++
 2 files changed, 154 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eba099f5/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index d912ff7..6deca7f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -21,11 +21,16 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.io.BaseEncoding;
 import java.io.IOException;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
+import java.util.Map;
 import java.util.regex.Pattern;
 import javax.annotation.Nullable;
+
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
@@ -39,6 +44,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.values.PBegin;
@@ -455,6 +461,15 @@ public class AvroIO {
     }
 
     /**
+     * Returns a {@link PTransform} that writes Avro file(s) with the specified metadata.
+     *
+     * <p>Supported value types are String, Long, and byte[].
+     */
+    public static Bound<GenericRecord> withMetadata(Map<String, Object> metadata) {
+      return new Bound<>(GenericRecord.class).withMetadata(metadata);
+    }
+
+    /**
      * A {@link PTransform} that writes a bounded {@link PCollection} to an Avro file (or
      * multiple Avro files matching a sharding pattern).
      *
@@ -464,6 +479,8 @@ public class AvroIO {
       private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
       private static final SerializableAvroCodecFactory DEFAULT_CODEC =
           new SerializableAvroCodecFactory(CodecFactory.deflateCodec(6));
+      // This should be a multiple of 4 to not get a partial encoded byte.
+      private static final int METADATA_BYTES_MAX_LENGTH = 40;
 
       /** The filename to write to. */
       @Nullable
@@ -486,6 +503,8 @@ public class AvroIO {
        * https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html
        */
       final SerializableAvroCodecFactory codec;
+      /** Avro file metadata. */
+      final ImmutableMap<String, Object> metadata;
 
       Bound(Class<T> type) {
         this(
@@ -497,7 +516,8 @@ public class AvroIO {
             type,
             null,
             true,
-            DEFAULT_CODEC);
+            DEFAULT_CODEC,
+            ImmutableMap.<String, Object>of());
       }
 
       Bound(
@@ -509,7 +529,8 @@ public class AvroIO {
           Class<T> type,
           Schema schema,
           boolean validate,
-          SerializableAvroCodecFactory codec) {
+          SerializableAvroCodecFactory codec,
+          Map<String, Object> metadata) {
         super(name);
         this.filenamePrefix = filenamePrefix;
         this.filenameSuffix = filenameSuffix;
@@ -519,6 +540,18 @@ public class AvroIO {
         this.schema = schema;
         this.validate = validate;
         this.codec = codec;
+
+        Map<String, String> badKeys = Maps.newLinkedHashMap();
+        for (Map.Entry<String, Object> entry : metadata.entrySet()) {
+          Object v = entry.getValue();
+          if (!(v instanceof String || v instanceof Long || v instanceof byte[])) {
+            badKeys.put(entry.getKey(), v.getClass().getSimpleName());
+          }
+        }
+        checkArgument(
+            badKeys.isEmpty(),
+            "Metadata value type must be one of String, Long, or byte[]. Found {}", badKeys);
+        this.metadata = ImmutableMap.copyOf(metadata);
       }
 
       /**
@@ -541,7 +574,8 @@ public class AvroIO {
             type,
             schema,
             validate,
-            codec);
+            codec,
+            metadata);
       }
 
       /**
@@ -563,7 +597,8 @@ public class AvroIO {
             type,
             schema,
             validate,
-            codec);
+            codec,
+            metadata);
       }
 
       /**
@@ -591,7 +626,8 @@ public class AvroIO {
             type,
             schema,
             validate,
-            codec);
+            codec,
+            metadata);
       }
 
       /**
@@ -612,7 +648,8 @@ public class AvroIO {
             type,
             schema,
             validate,
-            codec);
+            codec,
+            metadata);
       }
 
       /**
@@ -634,7 +671,8 @@ public class AvroIO {
             type,
             schema,
             validate,
-            codec);
+            codec,
+            metadata);
       }
 
       /**
@@ -656,7 +694,8 @@ public class AvroIO {
             type,
             ReflectData.get().getSchema(type),
             validate,
-            codec);
+            codec,
+            metadata);
       }
 
       /**
@@ -676,7 +715,8 @@ public class AvroIO {
             GenericRecord.class,
             schema,
             validate,
-            codec);
+            codec,
+            metadata);
       }
 
       /**
@@ -710,7 +750,8 @@ public class AvroIO {
             type,
             schema,
             false,
-            codec);
+            codec,
+            metadata);
       }
 
       /**
@@ -729,7 +770,28 @@ public class AvroIO {
             type,
             schema,
             validate,
-            new SerializableAvroCodecFactory(codec));
+            new SerializableAvroCodecFactory(codec),
+            metadata);
+      }
+
+      /**
+       * Returns a new {@link PTransform} that's like this one but
+       * that writes to Avro file(s) with the specified metadata.
+       *
+       * <p>Does not modify this object.
+       */
+      public Bound<T> withMetadata(Map<String, Object> metadata) {
+        return new Bound<>(
+            name,
+            filenamePrefix,
+            filenameSuffix,
+            numShards,
+            shardTemplate,
+            type,
+            schema,
+            validate,
+            codec,
+            metadata);
       }
 
       @Override
@@ -749,7 +811,8 @@ public class AvroIO {
                     filenameSuffix,
                     shardTemplate,
                     AvroCoder.of(type, schema),
-                    codec));
+                    codec,
+                    metadata));
         if (getNumShards() > 0) {
           write = write.withNumShards(getNumShards());
         }
@@ -779,6 +842,24 @@ public class AvroIO {
             .addIfNotDefault(DisplayData.item("codec", codec.toString())
                 .withLabel("Avro Compression Codec"),
                 DEFAULT_CODEC.toString());
+        builder.include("Metadata", new Metadata());
+      }
+
+      private class Metadata implements HasDisplayData {
+        @Override
+        public void populateDisplayData(DisplayData.Builder builder) {
+          for (Map.Entry<String, Object> entry : metadata.entrySet()) {
+            DisplayData.Type type = DisplayData.inferType(entry.getValue());
+            if (type != null) {
+              builder.add(DisplayData.item(entry.getKey(), type, entry.getValue()));
+            } else {
+              String base64 = BaseEncoding.base64().encode((byte[]) entry.getValue());
+              String repr = base64.length() <= METADATA_BYTES_MAX_LENGTH
+                  ? base64 : base64.substring(0, METADATA_BYTES_MAX_LENGTH) + "...";
+              builder.add(DisplayData.item(entry.getKey(), repr));
+            }
+          }
+        }
       }
 
       /**
@@ -824,6 +905,10 @@ public class AvroIO {
       public CodecFactory getCodec() {
         return codec.getCodec();
       }
+
+      public Map<String, Object> getMetadata() {
+        return metadata;
+      }
     }
 
     /** Disallow construction of utility class. */
@@ -853,6 +938,7 @@ public class AvroIO {
   static class AvroSink<T> extends FileBasedSink<T> {
     private final AvroCoder<T> coder;
     private final SerializableAvroCodecFactory codec;
+    private final ImmutableMap<String, Object> metadata;
 
     @VisibleForTesting
     AvroSink(
@@ -860,16 +946,17 @@ public class AvroIO {
         String extension,
         String fileNameTemplate,
         AvroCoder<T> coder,
-        SerializableAvroCodecFactory codec) {
+        SerializableAvroCodecFactory codec,
+        ImmutableMap<String, Object> metadata) {
       super(baseOutputFilename, extension, fileNameTemplate);
       this.coder = coder;
       this.codec = codec;
-
+      this.metadata = metadata;
     }
 
     @Override
     public FileBasedSink.FileBasedWriteOperation<T> createWriteOperation(PipelineOptions options) {
-      return new AvroWriteOperation<>(this, coder, codec);
+      return new AvroWriteOperation<>(this, coder, codec, metadata);
     }
 
     /**
@@ -879,18 +966,21 @@ public class AvroIO {
     private static class AvroWriteOperation<T> extends FileBasedWriteOperation<T> {
       private final AvroCoder<T> coder;
       private final SerializableAvroCodecFactory codec;
+      private final ImmutableMap<String, Object> metadata;
 
       private AvroWriteOperation(AvroSink<T> sink,
                                  AvroCoder<T> coder,
-                                 SerializableAvroCodecFactory codec) {
+                                 SerializableAvroCodecFactory codec,
+                                 ImmutableMap<String, Object> metadata) {
         super(sink);
         this.coder = coder;
         this.codec = codec;
+        this.metadata = metadata;
       }
 
       @Override
       public FileBasedWriter<T> createWriter(PipelineOptions options) throws Exception {
-        return new AvroWriter<>(this, coder, codec);
+        return new AvroWriter<>(this, coder, codec, metadata);
       }
     }
 
@@ -902,20 +992,37 @@ public class AvroIO {
       private final AvroCoder<T> coder;
       private DataFileWriter<T> dataFileWriter;
       private SerializableAvroCodecFactory codec;
+      private final ImmutableMap<String, Object> metadata;
 
       public AvroWriter(FileBasedWriteOperation<T> writeOperation,
                         AvroCoder<T> coder,
-                        SerializableAvroCodecFactory codec) {
+                        SerializableAvroCodecFactory codec,
+                        ImmutableMap<String, Object> metadata) {
         super(writeOperation);
         this.mimeType = MimeTypes.BINARY;
         this.coder = coder;
         this.codec = codec;
+        this.metadata = metadata;
       }
 
       @SuppressWarnings("deprecation") // uses internal test functionality.
       @Override
       protected void prepareWrite(WritableByteChannel channel) throws Exception {
         dataFileWriter = new DataFileWriter<>(coder.createDatumWriter()).setCodec(codec.getCodec());
+        for (Map.Entry<String, Object> entry : metadata.entrySet()) {
+          Object v = entry.getValue();
+          if (v instanceof String) {
+            dataFileWriter.setMeta(entry.getKey(), (String) v);
+          } else if (v instanceof Long) {
+            dataFileWriter.setMeta(entry.getKey(), (Long) v);
+          } else if (v instanceof byte[]) {
+            dataFileWriter.setMeta(entry.getKey(), (byte[]) v);
+          } else {
+            throw new IllegalStateException(
+                "Metadata value type must be one of String, Long, or byte[]. Found "
+                    + v.getClass().getSimpleName());
+          }
+        }
         dataFileWriter.create(coder.getSchema(), Channels.newOutputStream(channel));
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eba099f5/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 4825875..1a07177 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -21,6 +21,7 @@ import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.hasItem;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -28,6 +29,7 @@ import static org.junit.Assert.assertTrue;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterators;
 import java.io.File;
 import java.io.FileInputStream;
@@ -315,6 +317,33 @@ public class AvroIOTest {
     assertEquals(CodecFactory.xzCodec(9).toString(), serdeWrite.getCodec().toString());
   }
 
+  @Test
+  @SuppressWarnings("unchecked")
+  @Category(NeedsRunner.class)
+  public void testMetdata() throws Exception {
+    TestPipeline p = TestPipeline.create();
+    List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"),
+        new GenericClass(5, "bar"));
+    File outputFile = tmpFolder.newFile("output.avro");
+
+    p.apply(Create.of(values))
+        .apply(AvroIO.Write.to(outputFile.getAbsolutePath())
+            .withoutSharding()
+            .withSchema(GenericClass.class)
+            .withMetadata(ImmutableMap.<String, Object>of(
+                "stringKey", "stringValue",
+                "longKey", 100L,
+                "bytesKey", "bytesValue".getBytes())));
+    p.run();
+
+    DataFileStream dataFileStream = new DataFileStream(new FileInputStream(outputFile),
+        new GenericDatumReader());
+    assertEquals("stringValue", dataFileStream.getMetaString("stringKey"));
+    assertEquals(100L, dataFileStream.getMetaLong("longKey"));
+    assertArrayEquals("bytesValue".getBytes(), dataFileStream.getMeta("bytesKey"));
+  }
+
+
   @SuppressWarnings("deprecation") // using AvroCoder#createDatumReader for tests.
   private void runTestWrite(String[] expectedElements, int numShards) throws IOException {
     File baseOutputFile = new File(tmpFolder.getRoot(), "prefix");