You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by dc...@apache.org on 2022/03/05 00:37:14 UTC

[samza] branch master updated: SAMZA-2727: Fix UTFDataFormatException for FileUtil (#1587)

This is an automated email from the ASF dual-hosted git repository.

dchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 858ed3f  SAMZA-2727: Fix UTFDataFormatException for FileUtil (#1587)
858ed3f is described below

commit 858ed3fec4ea96c18c9514f7786514aff0836b72
Author: Daniel Chen <xr...@uwaterloo.ca>
AuthorDate: Fri Mar 4 16:37:08 2022 -0800

    SAMZA-2727: Fix UTFDataFormatException for FileUtil (#1587)
    
    * SAMZA-2727: Fix UTFDataFormatException for FileUtil
    
    * Fix variable names and readability
    
    * Fix MaxStringSegmentSize off by 1
---
 .../scala/org/apache/samza/util/FileUtil.scala     | 18 ++++++++-
 .../scala/org/apache/samza/util/TestFileUtil.scala | 47 ++++++++++++++++++----
 2 files changed, 56 insertions(+), 9 deletions(-)

diff --git a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala
index edea6ae..f9e7583 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala
@@ -26,9 +26,12 @@ import java.nio.file._
 import java.util.zip.CRC32
 
 class FileUtil extends Logging {
+  val MaxStringSegmentWriteSize: Int = 64 * 1024
+
   /**
     * Writes checksum & data to a file
     * Checksum is pre-fixed to the data and is a 32-bit long type data.
+    * The data must only ASCII characters to be correcty serialized.
     * @param file The file handle to write to
     * @param data The data to be written to the file
     * */
@@ -42,7 +45,15 @@ class FileUtil extends Logging {
       fos = new FileOutputStream(tmpFile)
       oos = new ObjectOutputStream(fos)
       oos.writeLong(checksum)
-      oos.writeUTF(data)
+
+      var remainingDataSegment = data
+      // Split data into writable segments
+      while (remainingDataSegment.length >= MaxStringSegmentWriteSize) {
+        val splitData  = remainingDataSegment.splitAt(MaxStringSegmentWriteSize - 1)
+        oos.writeUTF(splitData._1)
+        remainingDataSegment = splitData._2
+      }
+      oos.writeUTF(remainingDataSegment)
     } finally {
       if (oos != null) oos.close()
       if (fos != null) fos.close()
@@ -109,7 +120,10 @@ class FileUtil extends Logging {
       fis = new FileInputStream(file)
       ois = new ObjectInputStream(fis)
       val checksumFromFile = ois.readLong()
-      val data = ois.readUTF()
+      var data = ois.readUTF()
+      while (ois.available() > 0) {
+        data = data + ois.readUTF()
+      }
       if(checksumFromFile == getChecksum(data)) {
         data
       } else {
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala
index 104f983..ebbd021 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala
@@ -21,7 +21,7 @@
 
 package org.apache.samza.util
 
-import org.apache.samza.testUtils.FileUtil
+import org.apache.commons.lang3.RandomStringUtils
 
 import java.io.{File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream}
 import org.junit.Assert.{assertEquals, assertNull, assertTrue, fail}
@@ -33,8 +33,9 @@ import scala.util.Random
 class TestFileUtil {
   val data = "100"
   val fileUtil = new FileUtil()
-  val checksum = fileUtil.getChecksum(data)
-  val file = new File(System.getProperty("java.io.tmpdir"), "test")
+  val checksum: Long = fileUtil.getChecksum(data)
+  val tmpDir: String = System.getProperty("java.io.tmpdir")
+  val file = new File(tmpDir, "test")
 
   @Test
   def testWriteDataToFile() {
@@ -54,6 +55,26 @@ class TestFileUtil {
   }
 
   @Test
+  def testWriteLargeDataToFile() {
+    val largeData = RandomStringUtils.randomAscii(fileUtil.MaxStringSegmentWriteSize * 2)
+    val largeChecksum = fileUtil.getChecksum(largeData)
+
+    // Invoke test
+    fileUtil.writeWithChecksum(file, largeData)
+
+    // Check that file exists
+    assertTrue("File was not created!", file.exists())
+    val fis = new FileInputStream(file)
+    val ois = new ObjectInputStream(fis)
+
+    // Check content of the file is as expected
+    assertEquals(largeChecksum, ois.readLong())
+    assertEquals(largeData, fileUtil.readWithChecksum(file))
+    ois.close()
+    fis.close()
+  }
+
+  @Test
   def testWriteDataToFileWithExistingOffsetFile() {
     // Invoke test
     val file = new File(System.getProperty("java.io.tmpdir"), "test2")
@@ -93,6 +114,18 @@ class TestFileUtil {
   }
 
   @Test
+  def testReadDataFromBackwardsCompatFile() {
+    // Write
+    fileUtil.writeWithChecksum(file, data)
+
+    // Invoke test
+    val result = fileUtil.readWithChecksum(file)
+
+    // Check data returned
+    assertEquals(data, result)
+  }
+
+  @Test
   def testReadInvalidDataFromFile() {
     // Write garbage to produce a null result when it's read
     val fos = new FileOutputStream(file)
@@ -123,14 +156,14 @@ class TestFileUtil {
      * /tmp/samza-file-util-RANDOM-symlink (symlink to dir above)
      * /tmp/samza-file-util-RANDOM/subdir (created via the symlink above)
      */
-    val tmpDirPath = Paths.get(FileUtil.TMP_DIR)
+    val tmpDirPath = Paths.get(tmpDir)
     val tmpSubDirName = "samza-file-util-" + Random.nextInt()
     val tmpSubDirSymlinkName = tmpSubDirName + "-symlink"
 
-    val tmpSubDirPath = Paths.get(FileUtil.TMP_DIR, tmpSubDirName);
+    val tmpSubDirPath = Paths.get(tmpDir, tmpSubDirName);
     fileUtil.createDirectories(tmpSubDirPath)
 
-    val tmpSymlinkPath = Paths.get(FileUtil.TMP_DIR, tmpSubDirSymlinkName)
+    val tmpSymlinkPath = Paths.get(tmpDir, tmpSubDirSymlinkName)
     Files.createSymbolicLink(tmpSymlinkPath, tmpDirPath);
 
     try {
@@ -146,7 +179,7 @@ class TestFileUtil {
     fileUtil.createDirectories(tmpSymlinkPath)
 
     // verify that subdirs can be created via symlinks correctly.
-    val tmpSubSubDirPath = Paths.get(FileUtil.TMP_DIR, tmpSubDirName + "-symlink", "subdir")
+    val tmpSubSubDirPath = Paths.get(tmpDir, tmpSubDirName + "-symlink", "subdir")
     fileUtil.createDirectories(tmpSubSubDirPath)
   }
 }