You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/03/04 08:01:30 UTC

samza git commit: SAMZA-876: Add AvroDataFileHdfsWriter

Repository: samza
Updated Branches:
  refs/heads/master 84e5aad38 -> 9abcc8eec


SAMZA-876: Add AvroDataFileHdfsWriter


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9abcc8ee
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9abcc8ee
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9abcc8ee

Branch: refs/heads/master
Commit: 9abcc8eeceb54e1924e5da5dd2ff4f22a6a2ae43
Parents: 84e5aad
Author: Edi Bice <ed...@yahoo.com>
Authored: Thu Mar 3 23:00:27 2016 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Thu Mar 3 23:00:27 2016 -0800

----------------------------------------------------------------------
 .../documentation/versioned/hdfs/producer.md    | 14 ++-
 .../versioned/jobs/configuration-table.html     |  7 +-
 .../apache/samza/system/hdfs/HdfsConfig.scala   | 18 +++-
 .../hdfs/writer/AvroDataFileHdfsWriter.scala    | 78 +++++++++++++++++
 .../samza-hdfs-test-batch-job-avro.properties   | 19 +++++
 .../samza-hdfs-test-job-avro.properties         | 18 ++++
 .../hdfs/TestHdfsSystemProducerTestSuite.scala  | 90 +++++++++++++++++++-
 7 files changed, 234 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/9abcc8ee/docs/learn/documentation/versioned/hdfs/producer.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/hdfs/producer.md b/docs/learn/documentation/versioned/hdfs/producer.md
index cfd22c6..0865a35 100644
--- a/docs/learn/documentation/versioned/hdfs/producer.md
+++ b/docs/learn/documentation/versioned/hdfs/producer.md
@@ -21,7 +21,8 @@ title: Isolation
 
 ### Writing to HDFS from Samza
 
-The `samza-hdfs` module implements a Samza Producer to write to HDFS. The current implementation includes a ready-to-use `HdfsSystemProducer`, and two `HdfsWriter`s: One that writes messages of raw bytes to a `SequenceFile` of `BytesWritable` keys and values. The other writes UTF-8 `String`s to a `SequenceFile` with `LongWritable` keys and `Text` values.
+The `samza-hdfs` module implements a Samza Producer to write to HDFS. The current implementation includes a ready-to-use `HdfsSystemProducer`, and three `HdfsWriter`s: One that writes messages of raw bytes to a `SequenceFile` of `BytesWritable` keys and values. Another writes UTF-8 `String`s to a `SequenceFile` with `LongWritable` keys and `Text` values. 
+The last one writes out Avro data files including the schema automatically reflected from the POJO objects fed to it. 
 
 ### Configuring an HdfsSystemProducer
 
@@ -33,6 +34,7 @@ You might configure the system producer for use by your `StreamTasks` like this:
 systems.hdfs-clickstream.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory
 
 # define a serializer/deserializer for the hdfs-clickstream system
+# DO NOT define (i.e. comment out) a SerDe when using the AvroDataFileHdfsWriter so it can reflect the schema 
 systems.hdfs-clickstream.samza.msg.serde=some-serde-impl
 
 # consumer configs not needed for HDFS system, reader is not implemented yet 
@@ -42,8 +44,10 @@ systems.hdfs-clickstream.streams.metrics.samza.msg.serde=some-metrics-impl
 
 # Assign the implementation class for this system's HdfsWriter
 systems.hdfs-clickstream.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.TextSequenceFileHdfsWriter
+#systems.hdfs-clickstream.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.AvroDataFileHdfsWriter
 
-# Set HDFS SequenceFile compression type. Only BLOCK compression is supported currently
+# Set compression type supported by chosen Writer. Only BLOCK compression is supported currently
+# AvroDataFileHdfsWriter supports snappy, bzip2, deflate or none (null, anything other than the first three)
 systems.hdfs-clickstream.producer.hdfs.compression.type=snappy
 
 # The base dir for HDFS output. The default Bucketer for SequenceFile HdfsWriters
@@ -56,9 +60,11 @@ systems.hdfs-clickstream.producer.hdfs.bucketer.class=org.apache.samza.system.hd
 # Configure the DATE_PATH the Bucketer will set to bucket output files by day for this job run.
 systems.hdfs-clickstream.producer.hdfs.bucketer.date.path.format=yyyy_MM_dd
 
-# Optionally set the max output bytes per file. A new file will be cut and output
-# continued on the next write call each time this many bytes are written.
+# Optionally set the max output bytes (records for AvroDataFileHdfsWriter) per file. 
+# A new file will be cut and output continued on the next write call each time this many bytes
+# (records for AvroDataFileHdfsWriter) are written.
 systems.hdfs-clickstream.producer.hdfs.write.batch.size.bytes=134217728
+#systems.hdfs-clickstream.producer.hdfs.write.batch.size.records=10000
 ```
 
 The above configuration assumes a Metrics and Serde implemnetation has been properly configured against the `some-serde-impl` and `some-metrics-impl` labels somewhere else in the same `job.properties` file. Each of these properties has a reasonable default, so you can leave out the ones you don't need to customize for your job run.

http://git-wip-us.apache.org/repos/asf/samza/blob/9abcc8ee/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 175437c..2745a22 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -1525,10 +1525,15 @@
                     <td class="description">A date format (using Java's SimpleDataFormat syntax) appropriate for use in an HDFS Path, which can configure time-based bucketing of output files.</td>
                 </tr>
                 <tr>
-                  <td class="property" id="hdfs-write-batch-size-bytes">systems.*.producer.write.batch.size.bytes</td>
+                  <td class="property" id="hdfs-write-batch-size-bytes">systems.*.producer.hdfs.write.batch.size.bytes</td>
                   <td class="default">268435456</td>
                   <td class="description">The number of bytes of outgoing messages to write to each HDFS output file before cutting a new file. Defaults to 256MB if not set.</td>
                 </tr>
+                <tr>
+                    <td class="property" id="hdfs-write-batch-size-records">systems.*.producer.hdfs.write.batch.size.records</td>
+                    <td class="default">262144</td>
+                    <td class="description">The number of outgoing messages to write to each HDFS output file before cutting a new file. Defaults to 262144 if not set.</td>
+                </tr>
 
                 <tr>
                     <th colspan="3" class="section" id="task-migration">

http://git-wip-us.apache.org/repos/asf/samza/blob/9abcc8ee/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
index 7993119..61b7570 100644
--- a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
+++ b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
@@ -43,8 +43,12 @@ object HdfsConfig {
   val BASE_OUTPUT_DIR_DEFAULT = "/user/%s/%s"
 
   // how much data to write before splitting off a new partfile
-  val WRITE_BATCH_SIZE = "systems.%s.producer.hdfs.write.batch.size.bytes"
-  val WRITE_BATCH_SIZE_DEFAULT = (1024L * 1024L * 256L).toString
+  val WRITE_BATCH_SIZE_BYTES = "systems.%s.producer.hdfs.write.batch.size.bytes"
+  val WRITE_BATCH_SIZE_BYTES_DEFAULT = (1024L * 1024L * 256L).toString
+
+  // how much data to write before splitting off a new partfile
+  val WRITE_BATCH_SIZE_RECORDS = "systems.%s.producer.hdfs.write.batch.size.records"
+  val WRITE_BATCH_SIZE_RECORDS_DEFAULT = (256L * 1024L).toString
 
   // human-readable compression type name to be interpreted/handled by the HdfsWriter impl
   val COMPRESSION_TYPE = "systems.%s.producer.hdfs.compression.type"
@@ -107,7 +111,15 @@ class HdfsConfig(config: Config) extends ScalaMapConfig(config) {
    * MapReduce utilization for Hadoop jobs that will process the data later.
    */
   def getWriteBatchSizeBytes(systemName: String): Long = {
-    getOrElse(HdfsConfig.WRITE_BATCH_SIZE format systemName, HdfsConfig.WRITE_BATCH_SIZE_DEFAULT).toLong
+    getOrElse(HdfsConfig.WRITE_BATCH_SIZE_BYTES format systemName, HdfsConfig.WRITE_BATCH_SIZE_BYTES_DEFAULT).toLong
+  }
+
+  /**
+    * Split output files from all writer tasks based on # of bytes written to optimize
+    * MapReduce utilization for Hadoop jobs that will process the data later.
+    */
+  def getWriteBatchSizeRecords(systemName: String): Long = {
+    getOrElse(HdfsConfig.WRITE_BATCH_SIZE_RECORDS format systemName, HdfsConfig.WRITE_BATCH_SIZE_RECORDS_DEFAULT).toLong
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/9abcc8ee/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/AvroDataFileHdfsWriter.scala
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/AvroDataFileHdfsWriter.scala b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/AvroDataFileHdfsWriter.scala
new file mode 100644
index 0000000..c3da611
--- /dev/null
+++ b/samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/AvroDataFileHdfsWriter.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.hdfs.writer
+
+import org.apache.avro.file.{CodecFactory, DataFileWriter}
+import org.apache.avro.reflect.{ReflectData, ReflectDatumWriter}
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.io.IOUtils
+import org.apache.hadoop.io.compress.{DefaultCodec, GzipCodec, SnappyCodec}
+import org.apache.samza.system.OutgoingMessageEnvelope
+import org.apache.samza.system.hdfs.HdfsConfig
+
+/**
+  * Implentation of HdfsWriter for Avro data files. Stores in a file a sequence of data conforming to a schema.
+  * The schema is stored in the file with the data. Each datum in a file is of the same schema.
+  * Data is grouped into blocks. A synchronization marker is written between blocks, so that files may be split.
+  * Blocks may be compressed.
+  */
+class AvroDataFileHdfsWriter (dfs: FileSystem, systemName: String, config: HdfsConfig)
+  extends HdfsWriter[DataFileWriter[Object]](dfs, systemName, config) {
+
+  val batchSize = config.getWriteBatchSizeRecords(systemName)
+  val bucketer = Some(Bucketer.getInstance(systemName, config))
+  var recordsWritten = 0L
+
+  override def flush: Unit = writer.map { _.flush }
+
+  override def write(outgoing: OutgoingMessageEnvelope): Unit = {
+    val record = outgoing.getMessage
+    if (shouldStartNewOutputFile) {
+      close
+      writer = getNextWriter(record)
+    }
+
+    writer.map { seq =>
+      seq.append(record)
+      recordsWritten += 1
+    }
+  }
+
+  override def close: Unit = {
+    writer.map { w => w.flush ; IOUtils.closeStream(w) }
+    writer = None
+    recordsWritten = 0L
+  }
+
+  protected def shouldStartNewOutputFile: Boolean = {
+    recordsWritten >= batchSize || bucketer.get.shouldChangeBucket
+  }
+
+  protected def getNextWriter(record: Object): Option[DataFileWriter[Object]] = {
+    val path = bucketer.get.getNextWritePath(dfs)
+    val schema = ReflectData.get().getSchema(record.getClass)
+    val datumWriter = new ReflectDatumWriter[Object](schema)
+    val fileWriter = new DataFileWriter[Object](datumWriter)
+    val cn = config.getCompressionType(systemName)
+    if (cn != "none") fileWriter.setCodec(CodecFactory.fromString(cn))
+    Some(fileWriter.create(schema, dfs.create(path)))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9abcc8ee/samza-hdfs/src/test/resources/samza-hdfs-test-batch-job-avro.properties
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/test/resources/samza-hdfs-test-batch-job-avro.properties b/samza-hdfs/src/test/resources/samza-hdfs-test-batch-job-avro.properties
new file mode 100644
index 0000000..8a0927e
--- /dev/null
+++ b/samza-hdfs/src/test/resources/samza-hdfs-test-batch-job-avro.properties
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+systems.samza-hdfs-test-batch-job-avro.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.AvroDataFileHdfsWriter
+systems.samza-hdfs-test-batch-job-avro.producer.hdfs.write.batch.size.records=10

http://git-wip-us.apache.org/repos/asf/samza/blob/9abcc8ee/samza-hdfs/src/test/resources/samza-hdfs-test-job-avro.properties
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/test/resources/samza-hdfs-test-job-avro.properties b/samza-hdfs/src/test/resources/samza-hdfs-test-job-avro.properties
new file mode 100644
index 0000000..f01148a
--- /dev/null
+++ b/samza-hdfs/src/test/resources/samza-hdfs-test-job-avro.properties
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+systems.samza-hdfs-test-job-avro.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.AvroDataFileHdfsWriter
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/9abcc8ee/samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala b/samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala
index c4b04a1..261310d 100644
--- a/samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala
+++ b/samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala
@@ -20,13 +20,16 @@
 package org.apache.samza.system.hdfs
 
 
-import java.io.{File, IOException}
+import java.io.{InputStreamReader, File, IOException}
 import java.net.URI
 import java.text.SimpleDateFormat
 import java.util.Date
 
+import org.apache.avro.file.{SeekableFileInput, CodecFactory, DataFileWriter, DataFileReader}
+import org.apache.avro.reflect.{ReflectDatumReader, ReflectDatumWriter, ReflectData}
+import org.apache.avro.specific.SpecificDatumReader
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
+import org.apache.hadoop.fs._
 import org.apache.hadoop.hdfs.{DFSConfigKeys,MiniDFSCluster}
 import org.apache.hadoop.io.{SequenceFile, BytesWritable, LongWritable, Text}
 import org.apache.hadoop.io.SequenceFile.Reader
@@ -52,7 +55,9 @@ object TestHdfsSystemProducerTestSuite {
   val JOB_NAME = "samza-hdfs-test-job" // write some data as BytesWritable
   val BATCH_JOB_NAME = "samza-hdfs-test-batch-job" // write enough binary data to force the producer to split partfiles
   val TEXT_JOB_NAME = "samza-hdfs-test-job-text" // write some data as String
+  val AVRO_JOB_NAME = "samza-hdfs-test-job-avro" // write some data as Avro
   val TEXT_BATCH_JOB_NAME = "samza-hdfs-test-batch-job-text" // force a file split, understanding that Text does some compressing
+  val AVRO_BATCH_JOB_NAME = "samza-hdfs-test-batch-job-avro" // force a file split, understanding that Avro does some compressing
   val RESOURCE_PATH_FORMAT = "file://%s/src/test/resources/%s.properties"
   val TEST_DATE = (new SimpleDateFormat("yyyy_MM_dd-HH")).format(new Date)
 
@@ -60,6 +65,10 @@ object TestHdfsSystemProducerTestSuite {
   val EXPECTED = Array[String]("small_data", "medium_data", "large_data")
   val LUMP = new scala.util.Random().nextString(BATCH_SIZE)
 
+  case class AvroTestClass(a1: Long, b2: String) {
+    def this() = this(0L, "")
+  }
+
   val hdfsFactory = new TestHdfsSystemFactory()
   val propsFactory = new PropertiesConfigFactory()
   val cluster = getMiniCluster
@@ -256,6 +265,83 @@ class TestHdfsSystemProducerTestSuite extends Logging {
     }
   }
 
+  @Test
+  def testHdfsSystemProducerAvroWrite {
+    var producer: Option[HdfsSystemProducer] = None
+
+    try {
+      producer = buildProducer(AVRO_JOB_NAME, cluster.get)
+      producer.get.register(TEST)
+      producer.get.start
+
+      Thread.sleep(PAUSE)
+
+      val systemStream = new SystemStream(AVRO_JOB_NAME, TEST)
+      val atc = new AvroTestClass(1280382045923456789L, "alkjdsfafloiqulkjasoiuqlklakdsflkja")
+      producer.get.send(TEST, new OutgoingMessageEnvelope(systemStream, atc))
+
+      producer.get.stop
+      producer = None
+
+      val dfs = cluster.get.getFileSystem
+      val results = dfs.listStatus(testWritePath(AVRO_JOB_NAME))
+      val bytesWritten = results.toList.foldLeft(0L) { (acc, status) => acc + status.getLen }
+      assertTrue(results.length == 1)
+      assertTrue(bytesWritten > 0L)
+
+      val atf = new AvroFSInput(FileContext.getFileContext(), results.head.getPath)
+      val schema = ReflectData.get().getSchema(atc.getClass)
+      val datumReader = new ReflectDatumReader[Object](schema)
+      val tfReader = DataFileReader.openReader(atf, datumReader)
+      val atc2 = tfReader.next().asInstanceOf[AvroTestClass]
+
+      assertTrue(atc == atc2)
+
+    } finally {
+      producer.map { _.stop }
+    }
+  }
+
+  @Test
+  def testHdfsSystemProducerWriteAvroBatches {
+    var producer: Option[HdfsSystemProducer] = None
+
+    try {
+      producer = buildProducer(AVRO_BATCH_JOB_NAME, cluster.get)
+
+      producer.get.start
+      producer.get.register(TEST)
+      Thread.sleep(PAUSE)
+
+      val systemStream = new SystemStream(AVRO_BATCH_JOB_NAME, TEST)
+      val atc = new AvroTestClass(1280382045923456789L, "alkjdsfafloiqulkjasoiuqlklakdsflkja")
+
+      (1 to 20).map {
+        i => producer.get.send(TEST, new OutgoingMessageEnvelope(systemStream, atc))
+      }
+
+      producer.get.stop
+      producer = None
+
+      val dfs = cluster.get.getFileSystem
+      val results = dfs.listStatus(testWritePath(AVRO_BATCH_JOB_NAME))
+      // systems.samza-hdfs-test-batch-job-text.producer.hdfs.write.batch.size.records=10
+      assertEquals(2, results.length)
+
+      results.foreach { r =>
+        val atf = new AvroFSInput(FileContext.getFileContext(), r.getPath)
+        val schema = ReflectData.get().getSchema(atc.getClass)
+        val datumReader = new ReflectDatumReader[Object](schema)
+        val tfReader = DataFileReader.openReader(atf, datumReader)
+        val atc2 = tfReader.next().asInstanceOf[AvroTestClass]
+        assertTrue(atc == atc2)
+      }
+
+    } finally {
+      producer.map { _.stop }
+    }
+  }
+
 }