You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/06/13 14:44:55 UTC

[3/5] git commit: Add method to pass a schema for AvroOutputFormat

Add method to pass a schema for AvroOutputFormat


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/f82c61fd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/f82c61fd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/f82c61fd

Branch: refs/heads/master
Commit: f82c61fdcf9cc8a77eb2aed0a38abc615e18ff03
Parents: f391957
Author: mingliang <qm...@gmail.com>
Authored: Sat Jun 7 15:21:05 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Jun 13 14:28:04 2014 +0200

----------------------------------------------------------------------
 .../eu/stratosphere/api/java/io/AvroOutputFormat.java   | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f82c61fd/stratosphere-addons/avro/src/main/java/eu/stratosphere/api/java/io/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/avro/src/main/java/eu/stratosphere/api/java/io/AvroOutputFormat.java b/stratosphere-addons/avro/src/main/java/eu/stratosphere/api/java/io/AvroOutputFormat.java
index 6ec9449..a6eef66 100644
--- a/stratosphere-addons/avro/src/main/java/eu/stratosphere/api/java/io/AvroOutputFormat.java
+++ b/stratosphere-addons/avro/src/main/java/eu/stratosphere/api/java/io/AvroOutputFormat.java
@@ -32,6 +32,8 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> {
 
 	private final Class<E> avroValueType;
 
+	private Schema userDefinedSchema = null;
+
 	private transient DataFileWriter<E> dataFileWriter;
 
 
@@ -44,6 +46,10 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> {
 		this.avroValueType = type;
 	}
 
+	public void setSchema(Schema schema) {
+		this.userDefinedSchema = schema;
+	}
+
 	@Override
 	public void writeRecord(E record) throws IOException {
 		dataFileWriter.append(record);
@@ -68,7 +74,11 @@ public class AvroOutputFormat<E> extends FileOutputFormat<E> {
 			schema = ReflectData.get().getSchema(avroValueType);
 		}
 		dataFileWriter = new DataFileWriter<E>(datumWriter);
-		dataFileWriter.create(schema, this.stream);
+		if (userDefinedSchema == null) {
+			dataFileWriter.create(schema, stream);
+		} else {
+			dataFileWriter.create(userDefinedSchema, stream);
+		}
 	}
 
 	@Override