You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/04/26 17:40:09 UTC
[1/2] beam git commit: [BEAM-1871] Remove deprecated methods from
AvroCoder
Repository: beam
Updated Branches:
refs/heads/master 99cf72e7d -> c5cf90c70
[BEAM-1871] Remove deprecated methods from AvroCoder
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a503d349
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a503d349
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a503d349
Branch: refs/heads/master
Commit: a503d349d93bd3ef9077c0f4fae713dbb452f5d6
Parents: 99cf72e
Author: Luke Cwik <lc...@google.com>
Authored: Wed Apr 26 10:07:15 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Apr 26 10:32:44 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/coders/AvroCoder.java | 55 +++++---------------
.../java/org/apache/beam/sdk/io/AvroIO.java | 11 ++--
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 14 ++---
.../org/apache/beam/sdk/io/AvroSourceTest.java | 6 ++-
4 files changed, 33 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a503d349/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
index 91822bf..d60a2ca 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.coders;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import java.io.IOException;
import java.io.InputStream;
@@ -224,17 +223,20 @@ public class AvroCoder<T> extends CustomCoder<T> {
private final AvroCoder<T> myCoder = AvroCoder.this;
@Override
public DatumReader<T> initialValue() {
- return myCoder.createDatumReader();
+ return myCoder.getType().equals(GenericRecord.class)
+ ? new GenericDatumReader<T>(myCoder.getSchema())
+ : new ReflectDatumReader<T>(myCoder.getSchema());
+ }
+ };
+ this.writer = new EmptyOnDeserializationThreadLocal<DatumWriter<T>>() {
+ private final AvroCoder<T> myCoder = AvroCoder.this;
+ @Override
+ public DatumWriter<T> initialValue() {
+ return myCoder.getType().equals(GenericRecord.class)
+ ? new GenericDatumWriter<T>(myCoder.getSchema())
+ : new ReflectDatumWriter<T>(myCoder.getSchema());
}
};
- this.writer =
- new EmptyOnDeserializationThreadLocal<DatumWriter<T>>() {
- private final AvroCoder<T> myCoder = AvroCoder.this;
- @Override
- public DatumWriter<T> initialValue() {
- return myCoder.createDatumWriter();
- }
- };
}
/**
@@ -314,39 +316,6 @@ public class AvroCoder<T> extends CustomCoder<T> {
}
/**
- * Returns a new {@link DatumReader} that can be used to read from an Avro file directly. Assumes
- * the schema used to read is the same as the schema that was used when writing.
- *
- * @deprecated For {@code AvroCoder} internal use only.
- */
- // TODO: once we can remove this deprecated function, inline in constructor.
- @Deprecated
- @VisibleForTesting
- public DatumReader<T> createDatumReader() {
- if (type.equals(GenericRecord.class)) {
- return new GenericDatumReader<>(schemaSupplier.get());
- } else {
- return new ReflectDatumReader<>(schemaSupplier.get());
- }
- }
-
- /**
- * Returns a new {@link DatumWriter} that can be used to write to an Avro file directly.
- *
- * @deprecated For {@code AvroCoder} internal use only.
- */
- @Deprecated
- @VisibleForTesting
- // TODO: once we can remove this deprecated function, inline in constructor.
- public DatumWriter<T> createDatumWriter() {
- if (type.equals(GenericRecord.class)) {
- return new GenericDatumWriter<>(schemaSupplier.get());
- } else {
- return new ReflectDatumWriter<>(schemaSupplier.get());
- }
- }
-
- /**
* Returns the schema used by this coder.
*/
public Schema getSchema() {
http://git-wip-us.apache.org/repos/asf/beam/blob/a503d349/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 6a06972..24e158f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -23,18 +23,19 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.io.BaseEncoding;
-
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.Map;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
-
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VoidCoder;
@@ -1041,7 +1042,11 @@ public class AvroIO {
@SuppressWarnings("deprecation") // uses internal test functionality.
@Override
protected void prepareWrite(WritableByteChannel channel) throws Exception {
- dataFileWriter = new DataFileWriter<>(coder.createDatumWriter()).setCodec(codec.getCodec());
+ DatumWriter<T> datumWriter = coder.getType().equals(GenericRecord.class)
+ ? new GenericDatumWriter<T>(coder.getSchema())
+ : new ReflectDatumWriter<T>(coder.getSchema());
+
+ dataFileWriter = new DataFileWriter<>(datumWriter).setCodec(codec.getCodec());
for (Map.Entry<String, Object> entry : metadata.entrySet()) {
Object v = entry.getValue();
if (v instanceof String) {
http://git-wip-us.apache.org/repos/asf/beam/blob/a503d349/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index a988d85..3e1c4b8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -31,7 +31,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
-
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
@@ -41,7 +40,6 @@ import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
-
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileReader;
@@ -49,6 +47,8 @@ import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.reflect.Nullable;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.io.AvroIO.Write.Bound;
@@ -321,7 +321,7 @@ public class AvroIOTest {
public TestPipeline windowedAvroWritePipeline = TestPipeline.create();
@Test
- @Category({ValidatesRunner.class, UsesTestStream.class })
+ @Category({ValidatesRunner.class, UsesTestStream.class})
public void testWindowedAvroIOWrite() throws Throwable {
File baseOutputFile = new File(tmpFolder.getRoot(), "prefix");
final String outputFilePrefix = baseOutputFile.getAbsolutePath();
@@ -393,8 +393,9 @@ public class AvroIOTest {
for (File outputFile : expectedFiles) {
assertTrue("Expected output file " + outputFile.getAbsolutePath(), outputFile.exists());
try (DataFileReader<GenericClass> reader =
- new DataFileReader<>(outputFile, AvroCoder.of(
- GenericClass.class).createDatumReader())) {
+ new DataFileReader<>(outputFile,
+ new ReflectDatumReader<GenericClass>(
+ ReflectData.get().getSchema(GenericClass.class)))) {
Iterators.addAll(actualElements, reader);
}
outputFile.delete();
@@ -504,7 +505,8 @@ public class AvroIOTest {
for (File outputFile : expectedFiles) {
assertTrue("Expected output file " + outputFile.getName(), outputFile.exists());
try (DataFileReader<String> reader =
- new DataFileReader<>(outputFile, AvroCoder.of(String.class).createDatumReader())) {
+ new DataFileReader<>(outputFile,
+ new ReflectDatumReader(ReflectData.get().getSchema(String.class)))) {
Iterators.addAll(actualElements, reader);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a503d349/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
index a6b6db5..d6facba 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
@@ -42,11 +42,13 @@ import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileConstants;
import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.reflect.AvroDefault;
import org.apache.avro.reflect.Nullable;
import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.io.AvroSource.AvroMetadata;
@@ -98,7 +100,9 @@ public class AvroSourceTest {
String path = tmpFile.toString();
FileOutputStream os = new FileOutputStream(tmpFile);
- DatumWriter<T> datumWriter = coder.createDatumWriter();
+ DatumWriter<T> datumWriter = coder.getType().equals(GenericRecord.class)
+ ? new GenericDatumWriter<T>(coder.getSchema())
+ : new ReflectDatumWriter<T>(coder.getSchema());
try (DataFileWriter<T> writer = new DataFileWriter<>(datumWriter)) {
writer.setCodec(CodecFactory.fromString(codec));
writer.create(coder.getSchema(), os);
[2/2] beam git commit: [BEAM-1871] Remove deprecated methods from
AvroCoder
Posted by lc...@apache.org.
[BEAM-1871] Remove deprecated methods from AvroCoder
This closes #2710
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c5cf90c7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c5cf90c7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c5cf90c7
Branch: refs/heads/master
Commit: c5cf90c7047c06f0d7c5e1d064843f1a26371dcd
Parents: 99cf72e a503d34
Author: Luke Cwik <lc...@google.com>
Authored: Wed Apr 26 10:39:59 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Apr 26 10:39:59 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/coders/AvroCoder.java | 55 +++++---------------
.../java/org/apache/beam/sdk/io/AvroIO.java | 11 ++--
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 14 ++---
.../org/apache/beam/sdk/io/AvroSourceTest.java | 6 ++-
4 files changed, 33 insertions(+), 53 deletions(-)
----------------------------------------------------------------------