You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/10/28 14:48:09 UTC
[35/50] incubator-beam git commit: [BEAM-813] support metadata in
Avro sink
[BEAM-813] support metadata in Avro sink
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/eba099f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/eba099f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/eba099f5
Branch: refs/heads/apex-runner
Commit: eba099f564dba3dfbba30ae3533496b9e14f57a7
Parents: 25102f7
Author: Neville Li <ne...@spotify.com>
Authored: Mon Oct 24 18:56:36 2016 -0400
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Oct 26 15:30:50 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroIO.java | 143 ++++++++++++++++---
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 29 ++++
2 files changed, 154 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eba099f5/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index d912ff7..6deca7f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -21,11 +21,16 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.io.BaseEncoding;
import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
+import java.util.Map;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
+
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
@@ -39,6 +44,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.values.PBegin;
@@ -455,6 +461,15 @@ public class AvroIO {
}
/**
+ * Returns a {@link PTransform} that writes Avro file(s) with the specified metadata.
+ *
+ * <p>Supported value types are String, Long, and byte[].
+ */
+ public static Bound<GenericRecord> withMetadata(Map<String, Object> metadata) {
+ return new Bound<>(GenericRecord.class).withMetadata(metadata);
+ }
+
+ /**
* A {@link PTransform} that writes a bounded {@link PCollection} to an Avro file (or
* multiple Avro files matching a sharding pattern).
*
@@ -464,6 +479,8 @@ public class AvroIO {
private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
private static final SerializableAvroCodecFactory DEFAULT_CODEC =
new SerializableAvroCodecFactory(CodecFactory.deflateCodec(6));
+ // This should be a multiple of 4 to not get a partial encoded byte.
+ private static final int METADATA_BYTES_MAX_LENGTH = 40;
/** The filename to write to. */
@Nullable
@@ -486,6 +503,8 @@ public class AvroIO {
* https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html
*/
final SerializableAvroCodecFactory codec;
+ /** Avro file metadata. */
+ final ImmutableMap<String, Object> metadata;
Bound(Class<T> type) {
this(
@@ -497,7 +516,8 @@ public class AvroIO {
type,
null,
true,
- DEFAULT_CODEC);
+ DEFAULT_CODEC,
+ ImmutableMap.<String, Object>of());
}
Bound(
@@ -509,7 +529,8 @@ public class AvroIO {
Class<T> type,
Schema schema,
boolean validate,
- SerializableAvroCodecFactory codec) {
+ SerializableAvroCodecFactory codec,
+ Map<String, Object> metadata) {
super(name);
this.filenamePrefix = filenamePrefix;
this.filenameSuffix = filenameSuffix;
@@ -519,6 +540,18 @@ public class AvroIO {
this.schema = schema;
this.validate = validate;
this.codec = codec;
+
+ Map<String, String> badKeys = Maps.newLinkedHashMap();
+ for (Map.Entry<String, Object> entry : metadata.entrySet()) {
+ Object v = entry.getValue();
+ if (!(v instanceof String || v instanceof Long || v instanceof byte[])) {
+ badKeys.put(entry.getKey(), v.getClass().getSimpleName());
+ }
+ }
+ checkArgument(
+ badKeys.isEmpty(),
+ "Metadata value type must be one of String, Long, or byte[]. Found {}", badKeys);
+ this.metadata = ImmutableMap.copyOf(metadata);
}
/**
@@ -541,7 +574,8 @@ public class AvroIO {
type,
schema,
validate,
- codec);
+ codec,
+ metadata);
}
/**
@@ -563,7 +597,8 @@ public class AvroIO {
type,
schema,
validate,
- codec);
+ codec,
+ metadata);
}
/**
@@ -591,7 +626,8 @@ public class AvroIO {
type,
schema,
validate,
- codec);
+ codec,
+ metadata);
}
/**
@@ -612,7 +648,8 @@ public class AvroIO {
type,
schema,
validate,
- codec);
+ codec,
+ metadata);
}
/**
@@ -634,7 +671,8 @@ public class AvroIO {
type,
schema,
validate,
- codec);
+ codec,
+ metadata);
}
/**
@@ -656,7 +694,8 @@ public class AvroIO {
type,
ReflectData.get().getSchema(type),
validate,
- codec);
+ codec,
+ metadata);
}
/**
@@ -676,7 +715,8 @@ public class AvroIO {
GenericRecord.class,
schema,
validate,
- codec);
+ codec,
+ metadata);
}
/**
@@ -710,7 +750,8 @@ public class AvroIO {
type,
schema,
false,
- codec);
+ codec,
+ metadata);
}
/**
@@ -729,7 +770,28 @@ public class AvroIO {
type,
schema,
validate,
- new SerializableAvroCodecFactory(codec));
+ new SerializableAvroCodecFactory(codec),
+ metadata);
+ }
+
+ /**
+ * Returns a new {@link PTransform} that's like this one but
+ * that writes to Avro file(s) with the specified metadata.
+ *
+ * <p>Does not modify this object.
+ */
+ public Bound<T> withMetadata(Map<String, Object> metadata) {
+ return new Bound<>(
+ name,
+ filenamePrefix,
+ filenameSuffix,
+ numShards,
+ shardTemplate,
+ type,
+ schema,
+ validate,
+ codec,
+ metadata);
}
@Override
@@ -749,7 +811,8 @@ public class AvroIO {
filenameSuffix,
shardTemplate,
AvroCoder.of(type, schema),
- codec));
+ codec,
+ metadata));
if (getNumShards() > 0) {
write = write.withNumShards(getNumShards());
}
@@ -779,6 +842,24 @@ public class AvroIO {
.addIfNotDefault(DisplayData.item("codec", codec.toString())
.withLabel("Avro Compression Codec"),
DEFAULT_CODEC.toString());
+ builder.include("Metadata", new Metadata());
+ }
+
+ private class Metadata implements HasDisplayData {
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ for (Map.Entry<String, Object> entry : metadata.entrySet()) {
+ DisplayData.Type type = DisplayData.inferType(entry.getValue());
+ if (type != null) {
+ builder.add(DisplayData.item(entry.getKey(), type, entry.getValue()));
+ } else {
+ String base64 = BaseEncoding.base64().encode((byte[]) entry.getValue());
+ String repr = base64.length() <= METADATA_BYTES_MAX_LENGTH
+ ? base64 : base64.substring(0, METADATA_BYTES_MAX_LENGTH) + "...";
+ builder.add(DisplayData.item(entry.getKey(), repr));
+ }
+ }
+ }
}
/**
@@ -824,6 +905,10 @@ public class AvroIO {
public CodecFactory getCodec() {
return codec.getCodec();
}
+
+ public Map<String, Object> getMetadata() {
+ return metadata;
+ }
}
/** Disallow construction of utility class. */
@@ -853,6 +938,7 @@ public class AvroIO {
static class AvroSink<T> extends FileBasedSink<T> {
private final AvroCoder<T> coder;
private final SerializableAvroCodecFactory codec;
+ private final ImmutableMap<String, Object> metadata;
@VisibleForTesting
AvroSink(
@@ -860,16 +946,17 @@ public class AvroIO {
String extension,
String fileNameTemplate,
AvroCoder<T> coder,
- SerializableAvroCodecFactory codec) {
+ SerializableAvroCodecFactory codec,
+ ImmutableMap<String, Object> metadata) {
super(baseOutputFilename, extension, fileNameTemplate);
this.coder = coder;
this.codec = codec;
-
+ this.metadata = metadata;
}
@Override
public FileBasedSink.FileBasedWriteOperation<T> createWriteOperation(PipelineOptions options) {
- return new AvroWriteOperation<>(this, coder, codec);
+ return new AvroWriteOperation<>(this, coder, codec, metadata);
}
/**
@@ -879,18 +966,21 @@ public class AvroIO {
private static class AvroWriteOperation<T> extends FileBasedWriteOperation<T> {
private final AvroCoder<T> coder;
private final SerializableAvroCodecFactory codec;
+ private final ImmutableMap<String, Object> metadata;
private AvroWriteOperation(AvroSink<T> sink,
AvroCoder<T> coder,
- SerializableAvroCodecFactory codec) {
+ SerializableAvroCodecFactory codec,
+ ImmutableMap<String, Object> metadata) {
super(sink);
this.coder = coder;
this.codec = codec;
+ this.metadata = metadata;
}
@Override
public FileBasedWriter<T> createWriter(PipelineOptions options) throws Exception {
- return new AvroWriter<>(this, coder, codec);
+ return new AvroWriter<>(this, coder, codec, metadata);
}
}
@@ -902,20 +992,37 @@ public class AvroIO {
private final AvroCoder<T> coder;
private DataFileWriter<T> dataFileWriter;
private SerializableAvroCodecFactory codec;
+ private final ImmutableMap<String, Object> metadata;
public AvroWriter(FileBasedWriteOperation<T> writeOperation,
AvroCoder<T> coder,
- SerializableAvroCodecFactory codec) {
+ SerializableAvroCodecFactory codec,
+ ImmutableMap<String, Object> metadata) {
super(writeOperation);
this.mimeType = MimeTypes.BINARY;
this.coder = coder;
this.codec = codec;
+ this.metadata = metadata;
}
@SuppressWarnings("deprecation") // uses internal test functionality.
@Override
protected void prepareWrite(WritableByteChannel channel) throws Exception {
dataFileWriter = new DataFileWriter<>(coder.createDatumWriter()).setCodec(codec.getCodec());
+ for (Map.Entry<String, Object> entry : metadata.entrySet()) {
+ Object v = entry.getValue();
+ if (v instanceof String) {
+ dataFileWriter.setMeta(entry.getKey(), (String) v);
+ } else if (v instanceof Long) {
+ dataFileWriter.setMeta(entry.getKey(), (Long) v);
+ } else if (v instanceof byte[]) {
+ dataFileWriter.setMeta(entry.getKey(), (byte[]) v);
+ } else {
+ throw new IllegalStateException(
+ "Metadata value type must be one of String, Long, or byte[]. Found "
+ + v.getClass().getSimpleName());
+ }
+ }
dataFileWriter.create(coder.getSchema(), Channels.newOutputStream(channel));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eba099f5/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 4825875..1a07177 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -21,6 +21,7 @@ import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.hasItem;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
@@ -28,6 +29,7 @@ import static org.junit.Assert.assertTrue;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import java.io.File;
import java.io.FileInputStream;
@@ -315,6 +317,33 @@ public class AvroIOTest {
assertEquals(CodecFactory.xzCodec(9).toString(), serdeWrite.getCodec().toString());
}
+ @Test
+ @SuppressWarnings("unchecked")
+ @Category(NeedsRunner.class)
+ public void testMetdata() throws Exception {
+ TestPipeline p = TestPipeline.create();
+ List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"),
+ new GenericClass(5, "bar"));
+ File outputFile = tmpFolder.newFile("output.avro");
+
+ p.apply(Create.of(values))
+ .apply(AvroIO.Write.to(outputFile.getAbsolutePath())
+ .withoutSharding()
+ .withSchema(GenericClass.class)
+ .withMetadata(ImmutableMap.<String, Object>of(
+ "stringKey", "stringValue",
+ "longKey", 100L,
+ "bytesKey", "bytesValue".getBytes())));
+ p.run();
+
+ DataFileStream dataFileStream = new DataFileStream(new FileInputStream(outputFile),
+ new GenericDatumReader());
+ assertEquals("stringValue", dataFileStream.getMetaString("stringKey"));
+ assertEquals(100L, dataFileStream.getMetaLong("longKey"));
+ assertArrayEquals("bytesValue".getBytes(), dataFileStream.getMeta("bytesKey"));
+ }
+
+
@SuppressWarnings("deprecation") // using AvroCoder#createDatumReader for tests.
private void runTestWrite(String[] expectedElements, int numShards) throws IOException {
File baseOutputFile = new File(tmpFolder.getRoot(), "prefix");