You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by dc...@apache.org on 2022/03/05 00:37:14 UTC
[samza] branch master updated: SAMZA-2727: Fix UTFDataFormatException for FileUtil (#1587)
This is an automated email from the ASF dual-hosted git repository.
dchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 858ed3f SAMZA-2727: Fix UTFDataFormatException for FileUtil (#1587)
858ed3f is described below
commit 858ed3fec4ea96c18c9514f7786514aff0836b72
Author: Daniel Chen <xr...@uwaterloo.ca>
AuthorDate: Fri Mar 4 16:37:08 2022 -0800
SAMZA-2727: Fix UTFDataFormatException for FileUtil (#1587)
* SAMZA-2727: Fix UTFDataFormatException for FileUtil
* Fix variable names and readability
* Fix MaxStringSegmentSize off by 1
---
.../scala/org/apache/samza/util/FileUtil.scala | 18 ++++++++-
.../scala/org/apache/samza/util/TestFileUtil.scala | 47 ++++++++++++++++++----
2 files changed, 56 insertions(+), 9 deletions(-)
diff --git a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala
index edea6ae..f9e7583 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala
@@ -26,9 +26,12 @@ import java.nio.file._
import java.util.zip.CRC32
class FileUtil extends Logging {
+ val MaxStringSegmentWriteSize: Int = 64 * 1024
+
/**
* Writes checksum & data to a file
* Checksum is pre-fixed to the data and is a 32-bit long type data.
+ * The data must only ASCII characters to be correcty serialized.
* @param file The file handle to write to
* @param data The data to be written to the file
* */
@@ -42,7 +45,15 @@ class FileUtil extends Logging {
fos = new FileOutputStream(tmpFile)
oos = new ObjectOutputStream(fos)
oos.writeLong(checksum)
- oos.writeUTF(data)
+
+ var remainingDataSegment = data
+ // Split data into writable segments
+ while (remainingDataSegment.length >= MaxStringSegmentWriteSize) {
+ val splitData = remainingDataSegment.splitAt(MaxStringSegmentWriteSize - 1)
+ oos.writeUTF(splitData._1)
+ remainingDataSegment = splitData._2
+ }
+ oos.writeUTF(remainingDataSegment)
} finally {
if (oos != null) oos.close()
if (fos != null) fos.close()
@@ -109,7 +120,10 @@ class FileUtil extends Logging {
fis = new FileInputStream(file)
ois = new ObjectInputStream(fis)
val checksumFromFile = ois.readLong()
- val data = ois.readUTF()
+ var data = ois.readUTF()
+ while (ois.available() > 0) {
+ data = data + ois.readUTF()
+ }
if(checksumFromFile == getChecksum(data)) {
data
} else {
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala
index 104f983..ebbd021 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala
@@ -21,7 +21,7 @@
package org.apache.samza.util
-import org.apache.samza.testUtils.FileUtil
+import org.apache.commons.lang3.RandomStringUtils
import java.io.{File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream}
import org.junit.Assert.{assertEquals, assertNull, assertTrue, fail}
@@ -33,8 +33,9 @@ import scala.util.Random
class TestFileUtil {
val data = "100"
val fileUtil = new FileUtil()
- val checksum = fileUtil.getChecksum(data)
- val file = new File(System.getProperty("java.io.tmpdir"), "test")
+ val checksum: Long = fileUtil.getChecksum(data)
+ val tmpDir: String = System.getProperty("java.io.tmpdir")
+ val file = new File(tmpDir, "test")
@Test
def testWriteDataToFile() {
@@ -54,6 +55,26 @@ class TestFileUtil {
}
@Test
+ def testWriteLargeDataToFile() {
+ val largeData = RandomStringUtils.randomAscii(fileUtil.MaxStringSegmentWriteSize * 2)
+ val largeChecksum = fileUtil.getChecksum(largeData)
+
+ // Invoke test
+ fileUtil.writeWithChecksum(file, largeData)
+
+ // Check that file exists
+ assertTrue("File was not created!", file.exists())
+ val fis = new FileInputStream(file)
+ val ois = new ObjectInputStream(fis)
+
+ // Check content of the file is as expected
+ assertEquals(largeChecksum, ois.readLong())
+ assertEquals(largeData, fileUtil.readWithChecksum(file))
+ ois.close()
+ fis.close()
+ }
+
+ @Test
def testWriteDataToFileWithExistingOffsetFile() {
// Invoke test
val file = new File(System.getProperty("java.io.tmpdir"), "test2")
@@ -93,6 +114,18 @@ class TestFileUtil {
}
@Test
+ def testReadDataFromBackwardsCompatFile() {
+ // Write
+ fileUtil.writeWithChecksum(file, data)
+
+ // Invoke test
+ val result = fileUtil.readWithChecksum(file)
+
+ // Check data returned
+ assertEquals(data, result)
+ }
+
+ @Test
def testReadInvalidDataFromFile() {
// Write garbage to produce a null result when it's read
val fos = new FileOutputStream(file)
@@ -123,14 +156,14 @@ class TestFileUtil {
* /tmp/samza-file-util-RANDOM-symlink (symlink to dir above)
* /tmp/samza-file-util-RANDOM/subdir (created via the symlink above)
*/
- val tmpDirPath = Paths.get(FileUtil.TMP_DIR)
+ val tmpDirPath = Paths.get(tmpDir)
val tmpSubDirName = "samza-file-util-" + Random.nextInt()
val tmpSubDirSymlinkName = tmpSubDirName + "-symlink"
- val tmpSubDirPath = Paths.get(FileUtil.TMP_DIR, tmpSubDirName);
+ val tmpSubDirPath = Paths.get(tmpDir, tmpSubDirName);
fileUtil.createDirectories(tmpSubDirPath)
- val tmpSymlinkPath = Paths.get(FileUtil.TMP_DIR, tmpSubDirSymlinkName)
+ val tmpSymlinkPath = Paths.get(tmpDir, tmpSubDirSymlinkName)
Files.createSymbolicLink(tmpSymlinkPath, tmpDirPath);
try {
@@ -146,7 +179,7 @@ class TestFileUtil {
fileUtil.createDirectories(tmpSymlinkPath)
// verify that subdirs can be created via symlinks correctly.
- val tmpSubSubDirPath = Paths.get(FileUtil.TMP_DIR, tmpSubDirName + "-symlink", "subdir")
+ val tmpSubSubDirPath = Paths.get(tmpDir, tmpSubDirName + "-symlink", "subdir")
fileUtil.createDirectories(tmpSubSubDirPath)
}
}