You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/11/16 01:43:39 UTC

kafka git commit: KAFKA-6167: Timestamp on streams directory contains a colon, which is an illegal character

Repository: kafka
Updated Branches:
  refs/heads/trunk 9eabcc20d -> 539c4d53f


KAFKA-6167: Timestamp on streams directory contains a colon, which is an illegal character

 - change segment delimiter to .
 - added upgrade path
 - added test for old and new upgrade path

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Damian Guy <da...@gmail.com>, Guozhang Wang <wa...@gmail.com>

Closes #4210 from mjsax/kafka-6167-windows-issue


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/539c4d53
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/539c4d53
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/539c4d53

Branch: refs/heads/trunk
Commit: 539c4d53f8fac65063e4e519c6a51911550a151f
Parents: 9eabcc2
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Wed Nov 15 17:43:35 2017 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 15 17:43:35 2017 -0800

----------------------------------------------------------------------
 .../kafka/streams/state/internals/Segments.java | 53 +++++++++-----
 .../RocksDBSegmentedBytesStoreTest.java         | 26 ++++++-
 .../streams/state/internals/SegmentsTest.java   | 73 ++++++++++++++++----
 3 files changed, 120 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/539c4d53/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
index 7c6bb53..5993972 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
@@ -28,7 +28,6 @@ import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.SimpleTimeZone;
@@ -67,9 +66,11 @@ class Segments {
     }
 
     String segmentName(final long segmentId) {
-        // previous format used - as a separator so if this changes in the future
+        // (1) previous format used - as a separator so if this changes in the future
         // then we should use something different.
-        return name + ":" + segmentId * segmentInterval;
+        // (2) previous format used : as a separator (which did break KafkaStreams on Windows OS)
+        // so if this changes in the future then we should use something different.
+        return name + "." + segmentId * segmentInterval;
     }
 
     Segment getSegmentForTimestamp(final long timestamp) {
@@ -190,33 +191,49 @@ class Segments {
 
     private long segmentIdFromSegmentName(final String segmentName,
                                           final File parent) {
+        final int segmentSeparatorIndex = name.length();
+        final char segmentSeparator = segmentName.charAt(segmentSeparatorIndex);
+        final String segmentIdString = segmentName.substring(segmentSeparatorIndex + 1);
+        final long segmentId;
+
         // old style segment name with date
-        if (segmentName.charAt(name.length()) == '-') {
-            final String datePart = segmentName.substring(name.length() + 1);
-            final Date date;
+        if (segmentSeparator == '-') {
             try {
-                date = formatter.parse(datePart);
-                final long segmentId = date.getTime() / segmentInterval;
-                final File newName = new File(parent, segmentName(segmentId));
-                final File oldName = new File(parent, segmentName);
-                if (!oldName.renameTo(newName)) {
-                    throw new ProcessorStateException("Unable to rename old style segment from: "
-                                                              + oldName
-                                                              + " to new name: "
-                                                              + newName);
-                }
-                return segmentId;
+                segmentId = formatter.parse(segmentIdString).getTime() / segmentInterval;
             } catch (ParseException e) {
                 log.warn("Unable to parse segmentName {} to a date. This segment will be skipped", segmentName);
                 return -1L;
             }
+            renameSegmentFile(parent, segmentName, segmentId);
         } else {
+            // for both new formats (with : or .) parse segment ID identically
             try {
-                return Long.parseLong(segmentName.substring(name.length() + 1)) / segmentInterval;
+                segmentId = Long.parseLong(segmentIdString) / segmentInterval;
             } catch (NumberFormatException e) {
                 throw new ProcessorStateException("Unable to parse segment id as long from segmentName: " + segmentName);
             }
+
+            // intermediate segment name with : breaks KafkaStreams on Windows OS -> rename segment file to new name with .
+            if (segmentSeparator == ':') {
+                renameSegmentFile(parent, segmentName, segmentId);
+            }
         }
 
+        return segmentId;
+
     }
+
+    private void renameSegmentFile(final File parent,
+                                   final String segmentName,
+                                   final long segmentId) {
+        final File newName = new File(parent, segmentName(segmentId));
+        final File oldName = new File(parent, segmentName);
+        if (!oldName.renameTo(newName)) {
+            throw new ProcessorStateException("Unable to rename old style segment from: "
+                + oldName
+                + " to new name: "
+                + newName);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/539c4d53/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index 2dfc84b..bf3386d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -164,7 +164,7 @@ public class RocksDBSegmentedBytesStoreTest {
         bytesStore.close();
 
         final String firstSegmentName = segments.segmentName(0);
-        final String[] nameParts = firstSegmentName.split(":");
+        final String[] nameParts = firstSegmentName.split("\\.");
         final Long segmentId = Long.parseLong(nameParts[1]);
         final SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmm");
         formatter.setTimeZone(new SimpleTimeZone(0, "UTC"));
@@ -184,6 +184,30 @@ public class RocksDBSegmentedBytesStoreTest {
                                                   KeyValue.pair(new Windowed<>(key, new SessionWindow(30000L, 60000L)), 100L))));
     }
 
+    @Test
+    public void shouldLoadSegementsWithOldStyleColonFormattedName() {
+        final Segments segments = new Segments(storeName, retention, numSegments);
+        final String key = "a";
+        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L));
+        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(30000L, 60000L))), serializeValue(100L));
+        bytesStore.close();
+
+        final String firstSegmentName = segments.segmentName(0);
+        final String[] nameParts = firstSegmentName.split("\\.");
+        final File parent = new File(stateDir, storeName);
+        final File oldStyleName = new File(parent, nameParts[0] + ":" + Long.parseLong(nameParts[1]));
+        assertTrue(new File(parent, firstSegmentName).renameTo(oldStyleName));
+
+        bytesStore = new RocksDBSegmentedBytesStore(storeName,
+            retention,
+            numSegments,
+            schema);
+
+        bytesStore.init(context, bytesStore);
+        final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0L, 60000L));
+        assertThat(results, equalTo(Arrays.asList(KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 50L),
+            KeyValue.pair(new Windowed<>(key, new SessionWindow(30000L, 60000L)), 100L))));
+    }
 
     private Set<String> segmentDirs() {
         File windowDir = new File(stateDir, storeName);

http://git-wip-us.apache.org/repos/asf/kafka/blob/539c4d53/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
index 0646b7e..65cfb21 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
@@ -28,7 +28,10 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
+import java.text.SimpleDateFormat;
+import java.util.Date;
 import java.util.List;
+import java.util.SimpleTimeZone;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -41,16 +44,19 @@ public class SegmentsTest {
     private MockProcessorContext context;
     private Segments segments;
     private long segmentInterval;
+    private File stateDirectory;
+    private String storeName = "test";
+    private final int retentionPeriod =  4 * 60 * 1000;
 
     @Before
     public void createContext() {
-        context = new MockProcessorContext(TestUtils.tempDirectory(),
+        stateDirectory = TestUtils.tempDirectory();
+        context = new MockProcessorContext(stateDirectory,
                                            Serdes.String(),
                                            Serdes.Long(),
                                            new NoOpRecordCollector(),
                                            new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
-        int retentionPeriod = 4 * 60 * 1000;
-        segments = new Segments("test", retentionPeriod, NUM_SEGMENTS);
+        segments = new Segments(storeName, retentionPeriod, NUM_SEGMENTS);
         segmentInterval = Segments.segmentInterval(retentionPeriod, NUM_SEGMENTS);
     }
 
@@ -78,9 +84,9 @@ public class SegmentsTest {
 
     @Test
     public void shouldGetSegmentNameFromId() throws Exception {
-        assertEquals("test:0", segments.segmentName(0));
-        assertEquals("test:" + segmentInterval, segments.segmentName(1));
-        assertEquals("test:" + 2 * segmentInterval, segments.segmentName(2));
+        assertEquals("test.0", segments.segmentName(0));
+        assertEquals("test." + segmentInterval, segments.segmentName(1));
+        assertEquals("test." + 2 * segmentInterval, segments.segmentName(2));
     }
 
     @Test
@@ -88,9 +94,9 @@ public class SegmentsTest {
         final Segment segment1 = segments.getOrCreateSegment(0, context);
         final Segment segment2 = segments.getOrCreateSegment(1, context);
         final Segment segment3 = segments.getOrCreateSegment(2, context);
-        assertTrue(new File(context.stateDir(), "test/test:0").isDirectory());
-        assertTrue(new File(context.stateDir(), "test/test:" + segmentInterval).isDirectory());
-        assertTrue(new File(context.stateDir(), "test/test:" + 2 * segmentInterval).isDirectory());
+        assertTrue(new File(context.stateDir(), "test/test.0").isDirectory());
+        assertTrue(new File(context.stateDir(), "test/test." + segmentInterval).isDirectory());
+        assertTrue(new File(context.stateDir(), "test/test." + 2 * segmentInterval).isDirectory());
         assertEquals(true, segment1.isOpen());
         assertEquals(true, segment2.isOpen());
         assertEquals(true, segment3.isOpen());
@@ -100,7 +106,7 @@ public class SegmentsTest {
     public void shouldNotCreateSegmentThatIsAlreadyExpired() {
         segments.getOrCreateSegment(7, context);
         assertNull(segments.getOrCreateSegment(0, context));
-        assertFalse(new File(context.stateDir(), "test/test:0").exists());
+        assertFalse(new File(context.stateDir(), "test/test.0").exists());
     }
 
     @Test
@@ -111,9 +117,9 @@ public class SegmentsTest {
         assertFalse(segment1.isOpen());
         assertFalse(segment2.isOpen());
         assertTrue(segment3.isOpen());
-        assertFalse(new File(context.stateDir(), "test/test:0").exists());
-        assertFalse(new File(context.stateDir(), "test/test:" + segmentInterval).exists());
-        assertTrue(new File(context.stateDir(), "test/test:" + 7 * segmentInterval).exists());
+        assertFalse(new File(context.stateDir(), "test/test.0").exists());
+        assertFalse(new File(context.stateDir(), "test/test." + segmentInterval).exists());
+        assertTrue(new File(context.stateDir(), "test/test." + 7 * segmentInterval).exists());
     }
 
     @Test
@@ -203,6 +209,47 @@ public class SegmentsTest {
         verifyCorrectSegments(2, 5);
     }
 
+    @Test
+    public void shouldUpdateSegmentFileNameFromOldDateFormatToNewFormat() throws Exception {
+        final String storeDirectoryPath = stateDirectory.getAbsolutePath() + File.separator + storeName;
+        final File storeDirectory = new File(storeDirectoryPath);
+        storeDirectory.mkdirs();
+
+        final SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmm");
+        formatter.setTimeZone(new SimpleTimeZone(0, "UTC"));
+
+        for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) {
+            final File oldSegment = new File(storeDirectoryPath + File.separator + storeName + "-" + formatter.format(new Date(segmentId * segmentInterval)));
+            oldSegment.createNewFile();
+        }
+
+        segments.openExisting(context);
+
+        for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) {
+            final File newSegment = new File(storeDirectoryPath + File.separator + storeName + "." + segmentId * (retentionPeriod / (NUM_SEGMENTS - 1)));
+            assertTrue(newSegment.exists());
+        }
+    }
+
+    @Test
+    public void shouldUpdateSegmentFileNameFromOldColonFormatToNewFormat() throws Exception {
+        final String storeDirectoryPath = stateDirectory.getAbsolutePath() + File.separator + storeName;
+        final File storeDirectory = new File(storeDirectoryPath);
+        storeDirectory.mkdirs();
+
+        for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) {
+            final File oldSegment = new File(storeDirectoryPath + File.separator + storeName + ":" + segmentId * (retentionPeriod / (NUM_SEGMENTS - 1)));
+            oldSegment.createNewFile();
+        }
+
+        segments.openExisting(context);
+
+        for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) {
+            final File newSegment = new File(storeDirectoryPath + File.separator + storeName + "." + segmentId * (retentionPeriod / (NUM_SEGMENTS - 1)));
+            assertTrue(newSegment.exists());
+        }
+    }
+
     private void verifyCorrectSegments(final long first, final int numSegments) {
         final List<Segment> result = this.segments.segments(0, Long.MAX_VALUE);
         assertEquals(numSegments, result.size());