You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/04/26 16:20:44 UTC
[1/2] beam git commit: [BEAM-1871] Move functionality from AvroUtils
to AvroSource hiding visibility of internal method,
also remove deprecated method.
Repository: beam
Updated Branches:
refs/heads/master c7eccbbde -> e6dfd035f
[BEAM-1871] Move functionality from AvroUtils to AvroSource hiding visibility of internal method, also remove deprecated method.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5029f1ef
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5029f1ef
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5029f1ef
Branch: refs/heads/master
Commit: 5029f1effda0fe0d036d239f0e604c970fc980b6
Parents: c7eccbb
Author: Luke Cwik <lc...@google.com>
Authored: Wed Apr 26 08:51:45 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Apr 26 08:51:45 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroSource.java | 108 ++++++++++++-
.../org/apache/beam/sdk/util/AvroUtils.java | 147 ------------------
.../org/apache/beam/sdk/io/AvroSourceTest.java | 41 ++++-
.../org/apache/beam/sdk/util/AvroUtilsTest.java | 152 -------------------
4 files changed, 141 insertions(+), 307 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/5029f1ef/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
index 5e0900a..58e6555 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
@@ -51,10 +51,9 @@ import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.util.AvroUtils;
-import org.apache.beam.sdk.util.AvroUtils.AvroMetadata;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.snappy.SnappyCompressorInputStream;
@@ -254,7 +253,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
if (codec == null || syncMarker == null || fileSchemaString == null) {
AvroMetadata metadata;
try {
- metadata = AvroUtils.readMetadataFromFile(fileMetadata.resourceId());
+ metadata = readMetadataFromFile(fileMetadata.resourceId());
} catch (IOException e) {
throw new RuntimeException("Error reading metadata from file " + fileMetadata, e);
}
@@ -327,6 +326,109 @@ public class AvroSource<T> extends BlockBasedSource<T> {
return codec;
}
+ /**
+ * Avro file metadata.
+ */
+ @VisibleForTesting
+ static class AvroMetadata {
+ private byte[] syncMarker;
+ private String codec;
+ private String schemaString;
+
+ AvroMetadata(byte[] syncMarker, String codec, String schemaString) {
+ this.syncMarker = checkNotNull(syncMarker, "syncMarker");
+ this.codec = checkNotNull(codec, "codec");
+ this.schemaString = checkNotNull(schemaString, "schemaString");
+ }
+
+ /**
+ * The JSON-encoded <a href="https://avro.apache.org/docs/1.7.7/spec.html#schemas">schema</a>
+ * string for the file.
+ */
+ public String getSchemaString() {
+ return schemaString;
+ }
+
+ /**
+ * The <a href="https://avro.apache.org/docs/1.7.7/spec.html#Required+Codecs">codec</a> of the
+ * file.
+ */
+ public String getCodec() {
+ return codec;
+ }
+
+ /**
+ * The 16-byte sync marker for the file. See the documentation for
+ * <a href="https://avro.apache.org/docs/1.7.7/spec.html#Object+Container+Files">Object
+ * Container File</a> for more information.
+ */
+ public byte[] getSyncMarker() {
+ return syncMarker;
+ }
+ }
+
+ /**
+ * Reads the {@link AvroMetadata} from the header of an Avro file.
+ *
+ * <p>This method parses the header of an Avro
+ * <a href="https://avro.apache.org/docs/1.7.7/spec.html#Object+Container+Files">
+ * Object Container File</a>.
+ *
+ * @throws IOException if the file is an invalid format.
+ */
+ @VisibleForTesting
+ static AvroMetadata readMetadataFromFile(ResourceId fileResource) throws IOException {
+ String codec = null;
+ String schemaString = null;
+ byte[] syncMarker;
+ try (InputStream stream = Channels.newInputStream(FileSystems.open(fileResource))) {
+ BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null);
+
+ // The header of an object container file begins with a four-byte magic number, followed
+ // by the file metadata (including the schema and codec), encoded as a map. Finally, the
+ // header ends with the file's 16-byte sync marker.
+ // See https://avro.apache.org/docs/1.7.7/spec.html#Object+Container+Files for details on
+ // the encoding of container files.
+
+ // Read the magic number.
+ byte[] magic = new byte[DataFileConstants.MAGIC.length];
+ decoder.readFixed(magic);
+ if (!Arrays.equals(magic, DataFileConstants.MAGIC)) {
+ throw new IOException("Missing Avro file signature: " + fileResource);
+ }
+
+ // Read the metadata to find the codec and schema.
+ ByteBuffer valueBuffer = ByteBuffer.allocate(512);
+ long numRecords = decoder.readMapStart();
+ while (numRecords > 0) {
+ for (long recordIndex = 0; recordIndex < numRecords; recordIndex++) {
+ String key = decoder.readString();
+ // readBytes() clears the buffer and returns a buffer where:
+ // - position is the start of the bytes read
+ // - limit is the end of the bytes read
+ valueBuffer = decoder.readBytes(valueBuffer);
+ byte[] bytes = new byte[valueBuffer.remaining()];
+ valueBuffer.get(bytes);
+ if (key.equals(DataFileConstants.CODEC)) {
+ codec = new String(bytes, "UTF-8");
+ } else if (key.equals(DataFileConstants.SCHEMA)) {
+ schemaString = new String(bytes, "UTF-8");
+ }
+ }
+ numRecords = decoder.mapNext();
+ }
+ if (codec == null) {
+ codec = DataFileConstants.NULL_CODEC;
+ }
+
+ // Finally, read the sync marker.
+ syncMarker = new byte[DataFileConstants.SYNC_SIZE];
+ decoder.readFixed(syncMarker);
+ }
+ checkState(schemaString != null, "No schema present in Avro file metadata %s", fileResource);
+ return new AvroMetadata(syncMarker, codec, schemaString);
+ }
+
private DatumReader<T> createDatumReader() {
Schema readSchema = getReadSchema();
Schema fileSchema = getFileSchema();
http://git-wip-us.apache.org/repos/asf/beam/blob/5029f1ef/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AvroUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AvroUtils.java
deleted file mode 100644
index 232f5eb..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AvroUtils.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.util.Arrays;
-import org.apache.avro.file.DataFileConstants;
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.beam.sdk.io.FileSystems;
-import org.apache.beam.sdk.io.fs.ResourceId;
-
-/**
- * A set of utilities for working with Avro files.
- *
- * <p>These utilities are based on the <a
- * href="https://avro.apache.org/docs/1.7.7/spec.html">Avro 1.7.7</a> specification.
- */
-public class AvroUtils {
-
- /**
- * Avro file metadata.
- */
- public static class AvroMetadata {
- private byte[] syncMarker;
- private String codec;
- private String schemaString;
-
- AvroMetadata(byte[] syncMarker, String codec, String schemaString) {
- this.syncMarker = checkNotNull(syncMarker, "syncMarker");
- this.codec = checkNotNull(codec, "codec");
- this.schemaString = checkNotNull(schemaString, "schemaString");
- }
-
- /**
- * The JSON-encoded <a href="https://avro.apache.org/docs/1.7.7/spec.html#schemas">schema</a>
- * string for the file.
- */
- public String getSchemaString() {
- return schemaString;
- }
-
- /**
- * The <a href="https://avro.apache.org/docs/1.7.7/spec.html#Required+Codecs">codec</a> of the
- * file.
- */
- public String getCodec() {
- return codec;
- }
-
- /**
- * The 16-byte sync marker for the file. See the documentation for
- * <a href="https://avro.apache.org/docs/1.7.7/spec.html#Object+Container+Files">Object
- * Container File</a> for more information.
- */
- public byte[] getSyncMarker() {
- return syncMarker;
- }
- }
-
- @Deprecated // to be deleted
- public static AvroMetadata readMetadataFromFile(String filename) throws IOException {
- return readMetadataFromFile(FileSystems.matchSingleFileSpec(filename).resourceId());
- }
-
- /**
- * Reads the {@link AvroMetadata} from the header of an Avro file.
- *
- * <p>This method parses the header of an Avro
- * <a href="https://avro.apache.org/docs/1.7.7/spec.html#Object+Container+Files">
- * Object Container File</a>.
- *
- * @throws IOException if the file is an invalid format.
- */
- public static AvroMetadata readMetadataFromFile(ResourceId fileResource) throws IOException {
- String codec = null;
- String schemaString = null;
- byte[] syncMarker;
- try (InputStream stream = Channels.newInputStream(FileSystems.open(fileResource))) {
- BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null);
-
- // The header of an object container file begins with a four-byte magic number, followed
- // by the file metadata (including the schema and codec), encoded as a map. Finally, the
- // header ends with the file's 16-byte sync marker.
- // See https://avro.apache.org/docs/1.7.7/spec.html#Object+Container+Files for details on
- // the encoding of container files.
-
- // Read the magic number.
- byte[] magic = new byte[DataFileConstants.MAGIC.length];
- decoder.readFixed(magic);
- if (!Arrays.equals(magic, DataFileConstants.MAGIC)) {
- throw new IOException("Missing Avro file signature: " + fileResource);
- }
-
- // Read the metadata to find the codec and schema.
- ByteBuffer valueBuffer = ByteBuffer.allocate(512);
- long numRecords = decoder.readMapStart();
- while (numRecords > 0) {
- for (long recordIndex = 0; recordIndex < numRecords; recordIndex++) {
- String key = decoder.readString();
- // readBytes() clears the buffer and returns a buffer where:
- // - position is the start of the bytes read
- // - limit is the end of the bytes read
- valueBuffer = decoder.readBytes(valueBuffer);
- byte[] bytes = new byte[valueBuffer.remaining()];
- valueBuffer.get(bytes);
- if (key.equals(DataFileConstants.CODEC)) {
- codec = new String(bytes, "UTF-8");
- } else if (key.equals(DataFileConstants.SCHEMA)) {
- schemaString = new String(bytes, "UTF-8");
- }
- }
- numRecords = decoder.mapNext();
- }
- if (codec == null) {
- codec = DataFileConstants.NULL_CODEC;
- }
-
- // Finally, read the sync marker.
- syncMarker = new byte[DataFileConstants.SYNC_SIZE];
- decoder.readFixed(syncMarker);
- }
- checkState(schemaString != null, "No schema present in Avro file metadata %s", fileResource);
- return new AvroMetadata(syncMarker, codec, schemaString);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/5029f1ef/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
index c822e8c..a6b6db5 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
@@ -49,6 +49,7 @@ import org.apache.avro.reflect.Nullable;
import org.apache.avro.reflect.ReflectData;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.io.AvroSource.AvroMetadata;
import org.apache.beam.sdk.io.AvroSource.AvroReader;
import org.apache.beam.sdk.io.AvroSource.AvroReader.Seeker;
import org.apache.beam.sdk.io.BlockBasedSource.BlockBasedReader;
@@ -58,7 +59,6 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.AvroUtils;
import org.apache.beam.sdk.util.SerializableUtils;
import org.hamcrest.Matchers;
import org.junit.Rule;
@@ -435,8 +435,8 @@ public class AvroSourceTest {
String filename = generateTestFile("tmp.avro", birds, SyncBehavior.SYNC_DEFAULT, 0,
AvroCoder.of(Bird.class), DataFileConstants.NULL_CODEC);
Metadata fileMetadata = FileSystems.matchSingleFileSpec(filename);
- String schemaA = AvroUtils.readMetadataFromFile(fileMetadata.resourceId()).getSchemaString();
- String schemaB = AvroUtils.readMetadataFromFile(fileMetadata.resourceId()).getSchemaString();
+ String schemaA = AvroSource.readMetadataFromFile(fileMetadata.resourceId()).getSchemaString();
+ String schemaB = AvroSource.readMetadataFromFile(fileMetadata.resourceId()).getSchemaString();
assertNotSame(schemaA, schemaB);
AvroSource<GenericRecord> sourceA = AvroSource.from(filename).withSchema(schemaA);
@@ -454,8 +454,8 @@ public class AvroSourceTest {
String filename = generateTestFile("tmp.avro", birds, SyncBehavior.SYNC_DEFAULT, 0,
AvroCoder.of(Bird.class), DataFileConstants.NULL_CODEC);
Metadata fileMetadata = FileSystems.matchSingleFileSpec(filename);
- String schemaA = AvroUtils.readMetadataFromFile(fileMetadata.resourceId()).getSchemaString();
- String schemaB = AvroUtils.readMetadataFromFile(fileMetadata.resourceId()).getSchemaString();
+ String schemaA = AvroSource.readMetadataFromFile(fileMetadata.resourceId()).getSchemaString();
+ String schemaB = AvroSource.readMetadataFromFile(fileMetadata.resourceId()).getSchemaString();
assertNotSame(schemaA, schemaB);
AvroSource<GenericRecord> sourceA = (AvroSource<GenericRecord>) AvroSource.from(filename)
@@ -657,6 +657,37 @@ public class AvroSourceTest {
assertThat(displayData, hasDisplayItem("minBundleSize", 1234));
}
+ @Test
+ public void testReadMetadataWithCodecs() throws Exception {
+ // Test reading files generated using all codecs.
+ String codecs[] = {DataFileConstants.NULL_CODEC, DataFileConstants.BZIP2_CODEC,
+ DataFileConstants.DEFLATE_CODEC, DataFileConstants.SNAPPY_CODEC,
+ DataFileConstants.XZ_CODEC};
+ List<Bird> expected = createRandomRecords(DEFAULT_RECORD_COUNT);
+
+ for (String codec : codecs) {
+ String filename = generateTestFile(
+ codec, expected, SyncBehavior.SYNC_DEFAULT, 0, AvroCoder.of(Bird.class), codec);
+
+ Metadata fileMeta = FileSystems.matchSingleFileSpec(filename);
+ AvroMetadata metadata = AvroSource.readMetadataFromFile(fileMeta.resourceId());
+ assertEquals(codec, metadata.getCodec());
+ }
+ }
+
+ @Test
+ public void testReadSchemaString() throws Exception {
+ List<Bird> expected = createRandomRecords(DEFAULT_RECORD_COUNT);
+ String codec = DataFileConstants.NULL_CODEC;
+ String filename = generateTestFile(
+ codec, expected, SyncBehavior.SYNC_DEFAULT, 0, AvroCoder.of(Bird.class), codec);
+ Metadata fileMeta = FileSystems.matchSingleFileSpec(filename);
+ AvroMetadata metadata = AvroSource.readMetadataFromFile(fileMeta.resourceId());
+ // By default, parse validates the schema, which is what we want.
+ Schema schema = new Schema.Parser().parse(metadata.getSchemaString());
+ assertEquals(4, schema.getFields().size());
+ }
+
/**
* Class that will encode to a fixed size: 16 bytes.
*
http://git-wip-us.apache.org/repos/asf/beam/blob/5029f1ef/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AvroUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AvroUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AvroUtilsTest.java
deleted file mode 100644
index d42f8ed..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AvroUtilsTest.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import org.apache.avro.Schema;
-import org.apache.avro.file.CodecFactory;
-import org.apache.avro.file.DataFileConstants;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.reflect.Nullable;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.DefaultCoder;
-import org.apache.beam.sdk.io.FileSystems;
-import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
-import org.apache.beam.sdk.util.AvroUtils.AvroMetadata;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for AvroUtils.
- */
-@RunWith(JUnit4.class)
-public class AvroUtilsTest {
- @Rule
- public TemporaryFolder tmpFolder = new TemporaryFolder();
-
- private static final int DEFAULT_RECORD_COUNT = 10000;
-
- /**
- * Generates an input Avro file containing the given records in the temporary directory and
- * returns the full path of the file.
- */
- @SuppressWarnings("deprecation") // test of internal functionality
- private <T> String generateTestFile(String filename, List<T> elems, AvroCoder<T> coder,
- String codec) throws IOException {
- File tmpFile = tmpFolder.newFile(filename);
- String path = tmpFile.toString();
-
- FileOutputStream os = new FileOutputStream(tmpFile);
- DatumWriter<T> datumWriter = coder.createDatumWriter();
- try (DataFileWriter<T> writer = new DataFileWriter<>(datumWriter)) {
- writer.setCodec(CodecFactory.fromString(codec));
- writer.create(coder.getSchema(), os);
- for (T elem : elems) {
- writer.append(elem);
- }
- }
- return path;
- }
-
- @Test
- public void testReadMetadataWithCodecs() throws Exception {
- // Test reading files generated using all codecs.
- String codecs[] = {DataFileConstants.NULL_CODEC, DataFileConstants.BZIP2_CODEC,
- DataFileConstants.DEFLATE_CODEC, DataFileConstants.SNAPPY_CODEC,
- DataFileConstants.XZ_CODEC};
- List<Bird> expected = createRandomRecords(DEFAULT_RECORD_COUNT);
-
- for (String codec : codecs) {
- String filename = generateTestFile(
- codec, expected, AvroCoder.of(Bird.class), codec);
-
- Metadata fileMeta = FileSystems.matchSingleFileSpec(filename);
- AvroMetadata metadata = AvroUtils.readMetadataFromFile(fileMeta.resourceId());
- assertEquals(codec, metadata.getCodec());
- }
- }
-
- @Test
- public void testReadSchemaString() throws Exception {
- List<Bird> expected = createRandomRecords(DEFAULT_RECORD_COUNT);
- String codec = DataFileConstants.NULL_CODEC;
- String filename = generateTestFile(
- codec, expected, AvroCoder.of(Bird.class), codec);
- Metadata fileMeta = FileSystems.matchSingleFileSpec(filename);
- AvroMetadata metadata = AvroUtils.readMetadataFromFile(fileMeta.resourceId());
- // By default, parse validates the schema, which is what we want.
- Schema schema = new Schema.Parser().parse(metadata.getSchemaString());
- assertEquals(8, schema.getFields().size());
- }
-
- /**
- * Pojo class used as the record type in tests.
- */
- @DefaultCoder(AvroCoder.class)
- static class Bird {
- long number;
- @Nullable String species;
- @Nullable Double quality;
- @Nullable Long quantity;
- @Nullable Long birthday; // Exercises TIMESTAMP.
- @Nullable Boolean flighted;
- @Nullable SubBird scion;
- SubBird[] associates;
-
- static class SubBird {
- @Nullable String species;
-
- public SubBird() {}
- }
-
- public Bird() {
- associates = new SubBird[1];
- associates[0] = new SubBird();
- }
- }
-
- /**
- * Create a list of n random records.
- */
- private static List<Bird> createRandomRecords(long n) {
- String[] species = {"pigeons", "owls", "gulls", "hawks", "robins", "jays"};
- Random random = new Random(0);
-
- List<Bird> records = new ArrayList<>();
- for (long i = 0; i < n; i++) {
- Bird bird = new Bird();
- bird.quality = random.nextDouble();
- bird.species = species[random.nextInt(species.length)];
- bird.number = i;
- bird.quantity = random.nextLong();
- records.add(bird);
- }
- return records;
- }
-}
[2/2] beam git commit: [BEAM-1871] Move functionality from AvroUtils
to AvroSource hiding visibility of internal method,
also remove deprecated method.
Posted by lc...@apache.org.
[BEAM-1871] Move functionality from AvroUtils to AvroSource hiding visibility of internal method, also remove deprecated method.
This closes #2706
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e6dfd035
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e6dfd035
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e6dfd035
Branch: refs/heads/master
Commit: e6dfd035f8684f26308a159061ae2cf5eafaf95f
Parents: c7eccbb 5029f1e
Author: Luke Cwik <lc...@google.com>
Authored: Wed Apr 26 09:20:24 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Apr 26 09:20:24 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroSource.java | 108 ++++++++++++-
.../org/apache/beam/sdk/util/AvroUtils.java | 147 ------------------
.../org/apache/beam/sdk/io/AvroSourceTest.java | 41 ++++-
.../org/apache/beam/sdk/util/AvroUtilsTest.java | 152 -------------------
4 files changed, 141 insertions(+), 307 deletions(-)
----------------------------------------------------------------------