You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/11/16 01:43:39 UTC
kafka git commit: KAFKA-6167: Timestamp on streams directory contains
a colon, which is an illegal character
Repository: kafka
Updated Branches:
refs/heads/trunk 9eabcc20d -> 539c4d53f
KAFKA-6167: Timestamp on streams directory contains a colon, which is an illegal character
- change segment delimiter to .
- added upgrade path
- added test for old and new upgrade path
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Damian Guy <da...@gmail.com>, Guozhang Wang <wa...@gmail.com>
Closes #4210 from mjsax/kafka-6167-windows-issue
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/539c4d53
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/539c4d53
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/539c4d53
Branch: refs/heads/trunk
Commit: 539c4d53f8fac65063e4e519c6a51911550a151f
Parents: 9eabcc2
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Wed Nov 15 17:43:35 2017 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 15 17:43:35 2017 -0800
----------------------------------------------------------------------
.../kafka/streams/state/internals/Segments.java | 53 +++++++++-----
.../RocksDBSegmentedBytesStoreTest.java | 26 ++++++-
.../streams/state/internals/SegmentsTest.java | 73 ++++++++++++++++----
3 files changed, 120 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/539c4d53/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
index 7c6bb53..5993972 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
@@ -28,7 +28,6 @@ import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.SimpleTimeZone;
@@ -67,9 +66,11 @@ class Segments {
}
String segmentName(final long segmentId) {
- // previous format used - as a separator so if this changes in the future
+ // (1) previous format used - as a separator so if this changes in the future
// then we should use something different.
- return name + ":" + segmentId * segmentInterval;
+ // (2) previous format used : as a separator (which did break KafkaStreams on Windows OS)
+ // so if this changes in the future then we should use something different.
+ return name + "." + segmentId * segmentInterval;
}
Segment getSegmentForTimestamp(final long timestamp) {
@@ -190,33 +191,49 @@ class Segments {
private long segmentIdFromSegmentName(final String segmentName,
final File parent) {
+ final int segmentSeparatorIndex = name.length();
+ final char segmentSeparator = segmentName.charAt(segmentSeparatorIndex);
+ final String segmentIdString = segmentName.substring(segmentSeparatorIndex + 1);
+ final long segmentId;
+
// old style segment name with date
- if (segmentName.charAt(name.length()) == '-') {
- final String datePart = segmentName.substring(name.length() + 1);
- final Date date;
+ if (segmentSeparator == '-') {
try {
- date = formatter.parse(datePart);
- final long segmentId = date.getTime() / segmentInterval;
- final File newName = new File(parent, segmentName(segmentId));
- final File oldName = new File(parent, segmentName);
- if (!oldName.renameTo(newName)) {
- throw new ProcessorStateException("Unable to rename old style segment from: "
- + oldName
- + " to new name: "
- + newName);
- }
- return segmentId;
+ segmentId = formatter.parse(segmentIdString).getTime() / segmentInterval;
} catch (ParseException e) {
log.warn("Unable to parse segmentName {} to a date. This segment will be skipped", segmentName);
return -1L;
}
+ renameSegmentFile(parent, segmentName, segmentId);
} else {
+ // for both new formats (with : or .) parse segment ID identically
try {
- return Long.parseLong(segmentName.substring(name.length() + 1)) / segmentInterval;
+ segmentId = Long.parseLong(segmentIdString) / segmentInterval;
} catch (NumberFormatException e) {
throw new ProcessorStateException("Unable to parse segment id as long from segmentName: " + segmentName);
}
+
+ // intermediate segment name with : breaks KafkaStreams on Windows OS -> rename segment file to new name with .
+ if (segmentSeparator == ':') {
+ renameSegmentFile(parent, segmentName, segmentId);
+ }
}
+ return segmentId;
+
}
+
+ private void renameSegmentFile(final File parent,
+ final String segmentName,
+ final long segmentId) {
+ final File newName = new File(parent, segmentName(segmentId));
+ final File oldName = new File(parent, segmentName);
+ if (!oldName.renameTo(newName)) {
+ throw new ProcessorStateException("Unable to rename old style segment from: "
+ + oldName
+ + " to new name: "
+ + newName);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/539c4d53/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index 2dfc84b..bf3386d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -164,7 +164,7 @@ public class RocksDBSegmentedBytesStoreTest {
bytesStore.close();
final String firstSegmentName = segments.segmentName(0);
- final String[] nameParts = firstSegmentName.split(":");
+ final String[] nameParts = firstSegmentName.split("\\.");
final Long segmentId = Long.parseLong(nameParts[1]);
final SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmm");
formatter.setTimeZone(new SimpleTimeZone(0, "UTC"));
@@ -184,6 +184,30 @@ public class RocksDBSegmentedBytesStoreTest {
KeyValue.pair(new Windowed<>(key, new SessionWindow(30000L, 60000L)), 100L))));
}
+ @Test
+ public void shouldLoadSegementsWithOldStyleColonFormattedName() {
+ final Segments segments = new Segments(storeName, retention, numSegments);
+ final String key = "a";
+ bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L));
+ bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(30000L, 60000L))), serializeValue(100L));
+ bytesStore.close();
+
+ final String firstSegmentName = segments.segmentName(0);
+ final String[] nameParts = firstSegmentName.split("\\.");
+ final File parent = new File(stateDir, storeName);
+ final File oldStyleName = new File(parent, nameParts[0] + ":" + Long.parseLong(nameParts[1]));
+ assertTrue(new File(parent, firstSegmentName).renameTo(oldStyleName));
+
+ bytesStore = new RocksDBSegmentedBytesStore(storeName,
+ retention,
+ numSegments,
+ schema);
+
+ bytesStore.init(context, bytesStore);
+ final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0L, 60000L));
+ assertThat(results, equalTo(Arrays.asList(KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 50L),
+ KeyValue.pair(new Windowed<>(key, new SessionWindow(30000L, 60000L)), 100L))));
+ }
private Set<String> segmentDirs() {
File windowDir = new File(stateDir, storeName);
http://git-wip-us.apache.org/repos/asf/kafka/blob/539c4d53/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
index 0646b7e..65cfb21 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
@@ -28,7 +28,10 @@ import org.junit.Before;
import org.junit.Test;
import java.io.File;
+import java.text.SimpleDateFormat;
+import java.util.Date;
import java.util.List;
+import java.util.SimpleTimeZone;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -41,16 +44,19 @@ public class SegmentsTest {
private MockProcessorContext context;
private Segments segments;
private long segmentInterval;
+ private File stateDirectory;
+ private String storeName = "test";
+ private final int retentionPeriod = 4 * 60 * 1000;
@Before
public void createContext() {
- context = new MockProcessorContext(TestUtils.tempDirectory(),
+ stateDirectory = TestUtils.tempDirectory();
+ context = new MockProcessorContext(stateDirectory,
Serdes.String(),
Serdes.Long(),
new NoOpRecordCollector(),
new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
- int retentionPeriod = 4 * 60 * 1000;
- segments = new Segments("test", retentionPeriod, NUM_SEGMENTS);
+ segments = new Segments(storeName, retentionPeriod, NUM_SEGMENTS);
segmentInterval = Segments.segmentInterval(retentionPeriod, NUM_SEGMENTS);
}
@@ -78,9 +84,9 @@ public class SegmentsTest {
@Test
public void shouldGetSegmentNameFromId() throws Exception {
- assertEquals("test:0", segments.segmentName(0));
- assertEquals("test:" + segmentInterval, segments.segmentName(1));
- assertEquals("test:" + 2 * segmentInterval, segments.segmentName(2));
+ assertEquals("test.0", segments.segmentName(0));
+ assertEquals("test." + segmentInterval, segments.segmentName(1));
+ assertEquals("test." + 2 * segmentInterval, segments.segmentName(2));
}
@Test
@@ -88,9 +94,9 @@ public class SegmentsTest {
final Segment segment1 = segments.getOrCreateSegment(0, context);
final Segment segment2 = segments.getOrCreateSegment(1, context);
final Segment segment3 = segments.getOrCreateSegment(2, context);
- assertTrue(new File(context.stateDir(), "test/test:0").isDirectory());
- assertTrue(new File(context.stateDir(), "test/test:" + segmentInterval).isDirectory());
- assertTrue(new File(context.stateDir(), "test/test:" + 2 * segmentInterval).isDirectory());
+ assertTrue(new File(context.stateDir(), "test/test.0").isDirectory());
+ assertTrue(new File(context.stateDir(), "test/test." + segmentInterval).isDirectory());
+ assertTrue(new File(context.stateDir(), "test/test." + 2 * segmentInterval).isDirectory());
assertEquals(true, segment1.isOpen());
assertEquals(true, segment2.isOpen());
assertEquals(true, segment3.isOpen());
@@ -100,7 +106,7 @@ public class SegmentsTest {
public void shouldNotCreateSegmentThatIsAlreadyExpired() {
segments.getOrCreateSegment(7, context);
assertNull(segments.getOrCreateSegment(0, context));
- assertFalse(new File(context.stateDir(), "test/test:0").exists());
+ assertFalse(new File(context.stateDir(), "test/test.0").exists());
}
@Test
@@ -111,9 +117,9 @@ public class SegmentsTest {
assertFalse(segment1.isOpen());
assertFalse(segment2.isOpen());
assertTrue(segment3.isOpen());
- assertFalse(new File(context.stateDir(), "test/test:0").exists());
- assertFalse(new File(context.stateDir(), "test/test:" + segmentInterval).exists());
- assertTrue(new File(context.stateDir(), "test/test:" + 7 * segmentInterval).exists());
+ assertFalse(new File(context.stateDir(), "test/test.0").exists());
+ assertFalse(new File(context.stateDir(), "test/test." + segmentInterval).exists());
+ assertTrue(new File(context.stateDir(), "test/test." + 7 * segmentInterval).exists());
}
@Test
@@ -203,6 +209,47 @@ public class SegmentsTest {
verifyCorrectSegments(2, 5);
}
+ @Test
+ public void shouldUpdateSegmentFileNameFromOldDateFormatToNewFormat() throws Exception {
+ final String storeDirectoryPath = stateDirectory.getAbsolutePath() + File.separator + storeName;
+ final File storeDirectory = new File(storeDirectoryPath);
+ storeDirectory.mkdirs();
+
+ final SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmm");
+ formatter.setTimeZone(new SimpleTimeZone(0, "UTC"));
+
+ for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) {
+ final File oldSegment = new File(storeDirectoryPath + File.separator + storeName + "-" + formatter.format(new Date(segmentId * segmentInterval)));
+ oldSegment.createNewFile();
+ }
+
+ segments.openExisting(context);
+
+ for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) {
+ final File newSegment = new File(storeDirectoryPath + File.separator + storeName + "." + segmentId * (retentionPeriod / (NUM_SEGMENTS - 1)));
+ assertTrue(newSegment.exists());
+ }
+ }
+
+ @Test
+ public void shouldUpdateSegmentFileNameFromOldColonFormatToNewFormat() throws Exception {
+ final String storeDirectoryPath = stateDirectory.getAbsolutePath() + File.separator + storeName;
+ final File storeDirectory = new File(storeDirectoryPath);
+ storeDirectory.mkdirs();
+
+ for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) {
+ final File oldSegment = new File(storeDirectoryPath + File.separator + storeName + ":" + segmentId * (retentionPeriod / (NUM_SEGMENTS - 1)));
+ oldSegment.createNewFile();
+ }
+
+ segments.openExisting(context);
+
+ for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) {
+ final File newSegment = new File(storeDirectoryPath + File.separator + storeName + "." + segmentId * (retentionPeriod / (NUM_SEGMENTS - 1)));
+ assertTrue(newSegment.exists());
+ }
+ }
+
private void verifyCorrectSegments(final long first, final int numSegments) {
final List<Segment> result = this.segments.segments(0, Long.MAX_VALUE);
assertEquals(numSegments, result.size());