You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/03/04 08:01:30 UTC
samza git commit: SAMZA-876: Add AvroDataFileHdfsWriter
Repository: samza
Updated Branches:
refs/heads/master 84e5aad38 -> 9abcc8eec
SAMZA-876: Add AvroDataFileHdfsWriter
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9abcc8ee
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9abcc8ee
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9abcc8ee
Branch: refs/heads/master
Commit: 9abcc8eeceb54e1924e5da5dd2ff4f22a6a2ae43
Parents: 84e5aad
Author: Edi Bice <ed...@yahoo.com>
Authored: Thu Mar 3 23:00:27 2016 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Thu Mar 3 23:00:27 2016 -0800
----------------------------------------------------------------------
.../documentation/versioned/hdfs/producer.md | 14 ++-
.../versioned/jobs/configuration-table.html | 7 +-
.../apache/samza/system/hdfs/HdfsConfig.scala | 18 +++-
.../hdfs/writer/AvroDataFileHdfsWriter.scala | 78 +++++++++++++++++
.../samza-hdfs-test-batch-job-avro.properties | 19 +++++
.../samza-hdfs-test-job-avro.properties | 18 ++++
.../hdfs/TestHdfsSystemProducerTestSuite.scala | 90 +++++++++++++++++++-
7 files changed, 234 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/9abcc8ee/docs/learn/documentation/versioned/hdfs/producer.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/hdfs/producer.md b/docs/learn/documentation/versioned/hdfs/producer.md
index cfd22c6..0865a35 100644
--- a/docs/learn/documentation/versioned/hdfs/producer.md
+++ b/docs/learn/documentation/versioned/hdfs/producer.md
@@ -21,7 +21,8 @@ title: Isolation
### Writing to HDFS from Samza
-The `samza-hdfs` module implements a Samza Producer to write to HDFS. The current implementation includes a ready-to-use `HdfsSystemProducer`, and two `HdfsWriter`s: One that writes messages of raw bytes to a `SequenceFile` of `BytesWritable` keys and values. The other writes UTF-8 `String`s to a `SequenceFile` with `LongWritable` keys and `Text` values.
+The `samza-hdfs` module implements a Samza Producer to write to HDFS. The current implementation includes a ready-to-use `HdfsSystemProducer`, and three `HdfsWriter`s: One that writes messages of raw bytes to a `SequenceFile` of `BytesWritable` keys and values. Another writes UTF-8 `String`s to a `SequenceFile` with `LongWritable` keys and `Text` values.
+The last one writes out Avro data files including the schema automatically reflected from the POJO objects fed to it.
### Configuring an HdfsSystemProducer
@@ -33,6 +34,7 @@ You might configure the system producer for use by your `StreamTasks` like this:
systems.hdfs-clickstream.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory
# define a serializer/deserializer for the hdfs-clickstream system
+# DO NOT define (i.e. comment out) a SerDe when using the AvroDataFileHdfsWriter so it can reflect the schema
systems.hdfs-clickstream.samza.msg.serde=some-serde-impl
# consumer configs not needed for HDFS system, reader is not implemented yet
@@ -42,8 +44,10 @@ systems.hdfs-clickstream.streams.metrics.samza.msg.serde=some-metrics-impl
# Assign the implementation class for this system's HdfsWriter
systems.hdfs-clickstream.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.TextSequenceFileHdfsWriter
+#systems.hdfs-clickstream.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.AvroDataFileHdfsWriter
-# Set HDFS SequenceFile compression type. Only BLOCK compression is supported currently
+# Set compression type supported by chosen Writer. Only BLOCK compression is supported currently
+# AvroDataFileHdfsWriter supports snappy, bzip2, deflate or none (null, anything other than the first three)
systems.hdfs-clickstream.producer.hdfs.compression.type=snappy
# The base dir for HDFS output. The default Bucketer for SequenceFile HdfsWriters
@@ -56,9 +60,11 @@ systems.hdfs-clickstream.producer.hdfs.bucketer.class=org.apache.samza.system.hd
# Configure the DATE_PATH the Bucketer will set to bucket output files by day for this job run.
systems.hdfs-clickstream.producer.hdfs.bucketer.date.path.format=yyyy_MM_dd
-# Optionally set the max output bytes per file. A new file will be cut and output
-# continued on the next write call each time this many bytes are written.
+# Optionally set the max output bytes (records for AvroDataFileHdfsWriter) per file.
+# A new file will be cut and output continued on the next write call each time this many bytes
+# (records for AvroDataFileHdfsWriter) are written.
systems.hdfs-clickstream.producer.hdfs.write.batch.size.bytes=134217728
+#systems.hdfs-clickstream.producer.hdfs.write.batch.size.records=10000
```
The above configuration assumes a Metrics and Serde implemnetation has been properly configured against the `some-serde-impl` and `some-metrics-impl` labels somewhere else in the same `job.properties` file. Each of these properties has a reasonable default, so you can leave out the ones you don't need to customize for your job run.
http://git-wip-us.apache.org/repos/asf/samza/blob/9abcc8ee/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 175437c..2745a22 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -1525,10 +1525,15 @@
<td class="description">A date format (using Java's SimpleDataFormat syntax) appropriate for use in an HDFS Path, which can configure time-based bucketing of output files.</td>
</tr>
<tr>
- <td class="property" id="hdfs-write-batch-size-bytes">systems.*.producer.write.batch.size.bytes</td>
+ <td class="property" id="hdfs-write-batch-size-bytes">systems.*.producer.hdfs.write.batch.size.bytes</td>
<td class="default">268435456</td>
<td class="description">The number of bytes of outgoing messages to write to each HDFS output file before cutting a new file. Defaults to 256MB if not set.</td>
</tr>
+ <tr>
+ <td class="property" id="hdfs-write-batch-size-records">systems.*.producer.hdfs.write.batch.size.records</td>
+ <td class="default">262144</td>
+ <td class="description">The number of outgoing messages to write to each HDFS output file before cutting a new file. Defaults to 262144 if not set.</td>
+ </tr>
<tr>
<th colspan="3" class="section" id="task-migration">
http://git-wip-us.apache.org/repos/asf/samza/blob/9abcc8ee/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
index 7993119..61b7570 100644
--- a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
+++ b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
@@ -43,8 +43,12 @@ object HdfsConfig {
val BASE_OUTPUT_DIR_DEFAULT = "/user/%s/%s"
// how much data to write before splitting off a new partfile
- val WRITE_BATCH_SIZE = "systems.%s.producer.hdfs.write.batch.size.bytes"
- val WRITE_BATCH_SIZE_DEFAULT = (1024L * 1024L * 256L).toString
+ val WRITE_BATCH_SIZE_BYTES = "systems.%s.producer.hdfs.write.batch.size.bytes"
+ val WRITE_BATCH_SIZE_BYTES_DEFAULT = (1024L * 1024L * 256L).toString
+
+ // how much data to write before splitting off a new partfile
+ val WRITE_BATCH_SIZE_RECORDS = "systems.%s.producer.hdfs.write.batch.size.records"
+ val WRITE_BATCH_SIZE_RECORDS_DEFAULT = (256L * 1024L).toString
// human-readable compression type name to be interpreted/handled by the HdfsWriter impl
val COMPRESSION_TYPE = "systems.%s.producer.hdfs.compression.type"
@@ -107,7 +111,15 @@ class HdfsConfig(config: Config) extends ScalaMapConfig(config) {
* MapReduce utilization for Hadoop jobs that will process the data later.
*/
def getWriteBatchSizeBytes(systemName: String): Long = {
- getOrElse(HdfsConfig.WRITE_BATCH_SIZE format systemName, HdfsConfig.WRITE_BATCH_SIZE_DEFAULT).toLong
+ getOrElse(HdfsConfig.WRITE_BATCH_SIZE_BYTES format systemName, HdfsConfig.WRITE_BATCH_SIZE_BYTES_DEFAULT).toLong
+ }
+
+ /**
+ * Split output files from all writer tasks based on # of bytes written to optimize
+ * MapReduce utilization for Hadoop jobs that will process the data later.
+ */
+ def getWriteBatchSizeRecords(systemName: String): Long = {
+ getOrElse(HdfsConfig.WRITE_BATCH_SIZE_RECORDS format systemName, HdfsConfig.WRITE_BATCH_SIZE_RECORDS_DEFAULT).toLong
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/9abcc8ee/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/AvroDataFileHdfsWriter.scala
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/AvroDataFileHdfsWriter.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/AvroDataFileHdfsWriter.scala
new file mode 100644
index 0000000..c3da611
--- /dev/null
+++ b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/AvroDataFileHdfsWriter.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.hdfs.writer
+
+import org.apache.avro.file.{CodecFactory, DataFileWriter}
+import org.apache.avro.reflect.{ReflectData, ReflectDatumWriter}
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.io.IOUtils
+import org.apache.hadoop.io.compress.{DefaultCodec, GzipCodec, SnappyCodec}
+import org.apache.samza.system.OutgoingMessageEnvelope
+import org.apache.samza.system.hdfs.HdfsConfig
+
+/**
+ * Implentation of HdfsWriter for Avro data files. Stores in a file a sequence of data conforming to a schema.
+ * The schema is stored in the file with the data. Each datum in a file is of the same schema.
+ * Data is grouped into blocks. A synchronization marker is written between blocks, so that files may be split.
+ * Blocks may be compressed.
+ */
+class AvroDataFileHdfsWriter (dfs: FileSystem, systemName: String, config: HdfsConfig)
+ extends HdfsWriter[DataFileWriter[Object]](dfs, systemName, config) {
+
+ val batchSize = config.getWriteBatchSizeRecords(systemName)
+ val bucketer = Some(Bucketer.getInstance(systemName, config))
+ var recordsWritten = 0L
+
+ override def flush: Unit = writer.map { _.flush }
+
+ override def write(outgoing: OutgoingMessageEnvelope): Unit = {
+ val record = outgoing.getMessage
+ if (shouldStartNewOutputFile) {
+ close
+ writer = getNextWriter(record)
+ }
+
+ writer.map { seq =>
+ seq.append(record)
+ recordsWritten += 1
+ }
+ }
+
+ override def close: Unit = {
+ writer.map { w => w.flush ; IOUtils.closeStream(w) }
+ writer = None
+ recordsWritten = 0L
+ }
+
+ protected def shouldStartNewOutputFile: Boolean = {
+ recordsWritten >= batchSize || bucketer.get.shouldChangeBucket
+ }
+
+ protected def getNextWriter(record: Object): Option[DataFileWriter[Object]] = {
+ val path = bucketer.get.getNextWritePath(dfs)
+ val schema = ReflectData.get().getSchema(record.getClass)
+ val datumWriter = new ReflectDatumWriter[Object](schema)
+ val fileWriter = new DataFileWriter[Object](datumWriter)
+ val cn = config.getCompressionType(systemName)
+ if (cn != "none") fileWriter.setCodec(CodecFactory.fromString(cn))
+ Some(fileWriter.create(schema, dfs.create(path)))
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/9abcc8ee/samza-hdfs/src/test/resources/samza-hdfs-test-batch-job-avro.properties
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/test/resources/samza-hdfs-test-batch-job-avro.properties b/samza-hdfs/src/test/resources/samza-hdfs-test-batch-job-avro.properties
new file mode 100644
index 0000000..8a0927e
--- /dev/null
+++ b/samza-hdfs/src/test/resources/samza-hdfs-test-batch-job-avro.properties
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+systems.samza-hdfs-test-batch-job-avro.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.AvroDataFileHdfsWriter
+systems.samza-hdfs-test-batch-job-avro.producer.hdfs.write.batch.size.records=10
http://git-wip-us.apache.org/repos/asf/samza/blob/9abcc8ee/samza-hdfs/src/test/resources/samza-hdfs-test-job-avro.properties
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/test/resources/samza-hdfs-test-job-avro.properties b/samza-hdfs/src/test/resources/samza-hdfs-test-job-avro.properties
new file mode 100644
index 0000000..f01148a
--- /dev/null
+++ b/samza-hdfs/src/test/resources/samza-hdfs-test-job-avro.properties
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+systems.samza-hdfs-test-job-avro.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.AvroDataFileHdfsWriter
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/9abcc8ee/samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala b/samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala
index c4b04a1..261310d 100644
--- a/samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala
+++ b/samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala
@@ -20,13 +20,16 @@
package org.apache.samza.system.hdfs
-import java.io.{File, IOException}
+import java.io.{InputStreamReader, File, IOException}
import java.net.URI
import java.text.SimpleDateFormat
import java.util.Date
+import org.apache.avro.file.{SeekableFileInput, CodecFactory, DataFileWriter, DataFileReader}
+import org.apache.avro.reflect.{ReflectDatumReader, ReflectDatumWriter, ReflectData}
+import org.apache.avro.specific.SpecificDatumReader
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
+import org.apache.hadoop.fs._
import org.apache.hadoop.hdfs.{DFSConfigKeys,MiniDFSCluster}
import org.apache.hadoop.io.{SequenceFile, BytesWritable, LongWritable, Text}
import org.apache.hadoop.io.SequenceFile.Reader
@@ -52,7 +55,9 @@ object TestHdfsSystemProducerTestSuite {
val JOB_NAME = "samza-hdfs-test-job" // write some data as BytesWritable
val BATCH_JOB_NAME = "samza-hdfs-test-batch-job" // write enough binary data to force the producer to split partfiles
val TEXT_JOB_NAME = "samza-hdfs-test-job-text" // write some data as String
+ val AVRO_JOB_NAME = "samza-hdfs-test-job-avro" // write some data as Avro
val TEXT_BATCH_JOB_NAME = "samza-hdfs-test-batch-job-text" // force a file split, understanding that Text does some compressing
+ val AVRO_BATCH_JOB_NAME = "samza-hdfs-test-batch-job-avro" // force a file split, understanding that Avro does some compressing
val RESOURCE_PATH_FORMAT = "file://%s/src/test/resources/%s.properties"
val TEST_DATE = (new SimpleDateFormat("yyyy_MM_dd-HH")).format(new Date)
@@ -60,6 +65,10 @@ object TestHdfsSystemProducerTestSuite {
val EXPECTED = Array[String]("small_data", "medium_data", "large_data")
val LUMP = new scala.util.Random().nextString(BATCH_SIZE)
+ case class AvroTestClass(a1: Long, b2: String) {
+ def this() = this(0L, "")
+ }
+
val hdfsFactory = new TestHdfsSystemFactory()
val propsFactory = new PropertiesConfigFactory()
val cluster = getMiniCluster
@@ -256,6 +265,83 @@ class TestHdfsSystemProducerTestSuite extends Logging {
}
}
+ @Test
+ def testHdfsSystemProducerAvroWrite {
+ var producer: Option[HdfsSystemProducer] = None
+
+ try {
+ producer = buildProducer(AVRO_JOB_NAME, cluster.get)
+ producer.get.register(TEST)
+ producer.get.start
+
+ Thread.sleep(PAUSE)
+
+ val systemStream = new SystemStream(AVRO_JOB_NAME, TEST)
+ val atc = new AvroTestClass(1280382045923456789L, "alkjdsfafloiqulkjasoiuqlklakdsflkja")
+ producer.get.send(TEST, new OutgoingMessageEnvelope(systemStream, atc))
+
+ producer.get.stop
+ producer = None
+
+ val dfs = cluster.get.getFileSystem
+ val results = dfs.listStatus(testWritePath(AVRO_JOB_NAME))
+ val bytesWritten = results.toList.foldLeft(0L) { (acc, status) => acc + status.getLen }
+ assertTrue(results.length == 1)
+ assertTrue(bytesWritten > 0L)
+
+ val atf = new AvroFSInput(FileContext.getFileContext(), results.head.getPath)
+ val schema = ReflectData.get().getSchema(atc.getClass)
+ val datumReader = new ReflectDatumReader[Object](schema)
+ val tfReader = DataFileReader.openReader(atf, datumReader)
+ val atc2 = tfReader.next().asInstanceOf[AvroTestClass]
+
+ assertTrue(atc == atc2)
+
+ } finally {
+ producer.map { _.stop }
+ }
+ }
+
+ @Test
+ def testHdfsSystemProducerWriteAvroBatches {
+ var producer: Option[HdfsSystemProducer] = None
+
+ try {
+ producer = buildProducer(AVRO_BATCH_JOB_NAME, cluster.get)
+
+ producer.get.start
+ producer.get.register(TEST)
+ Thread.sleep(PAUSE)
+
+ val systemStream = new SystemStream(AVRO_BATCH_JOB_NAME, TEST)
+ val atc = new AvroTestClass(1280382045923456789L, "alkjdsfafloiqulkjasoiuqlklakdsflkja")
+
+ (1 to 20).map {
+ i => producer.get.send(TEST, new OutgoingMessageEnvelope(systemStream, atc))
+ }
+
+ producer.get.stop
+ producer = None
+
+ val dfs = cluster.get.getFileSystem
+ val results = dfs.listStatus(testWritePath(AVRO_BATCH_JOB_NAME))
+ // systems.samza-hdfs-test-batch-job-text.producer.hdfs.write.batch.size.records=10
+ assertEquals(2, results.length)
+
+ results.foreach { r =>
+ val atf = new AvroFSInput(FileContext.getFileContext(), r.getPath)
+ val schema = ReflectData.get().getSchema(atc.getClass)
+ val datumReader = new ReflectDatumReader[Object](schema)
+ val tfReader = DataFileReader.openReader(atf, datumReader)
+ val atc2 = tfReader.next().asInstanceOf[AvroTestClass]
+ assertTrue(atc == atc2)
+ }
+
+ } finally {
+ producer.map { _.stop }
+ }
+ }
+
}