You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2016/12/30 00:07:33 UTC

[19/31] incubator-distributedlog git commit: DL-162: Use log segment entry store interface

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDLCK.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDLCK.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDLCK.java
index a7dead4..d1069c3 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDLCK.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDLCK.java
@@ -28,6 +28,7 @@ import com.twitter.distributedlog.TestZooKeeperClientBuilder;
 import com.twitter.distributedlog.ZooKeeperClient;
 import com.twitter.distributedlog.metadata.DryrunLogSegmentMetadataStoreUpdater;
 import com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
+import com.twitter.distributedlog.util.OrderedScheduler;
 import com.twitter.distributedlog.util.SchedulerUtils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs;
@@ -104,6 +105,10 @@ public class TestDLCK extends TestDistributedLogBase {
         zkc.get().create(uri.getPath(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         com.twitter.distributedlog.DistributedLogManagerFactory factory =
                 new com.twitter.distributedlog.DistributedLogManagerFactory(confLocal, uri);
+        OrderedScheduler scheduler = OrderedScheduler.newBuilder()
+                .name("dlck-tool")
+                .corePoolSize(1)
+                .build();
         ExecutorService executorService = Executors.newCachedThreadPool();
 
         String streamName = "check-and-repair-dl-namespace";
@@ -119,7 +124,7 @@ public class TestDLCK extends TestDistributedLogBase {
         BookKeeperClient bkc = getBookKeeperClient(factory);
         DistributedLogAdmin.checkAndRepairDLNamespace(uri, factory,
                 new DryrunLogSegmentMetadataStoreUpdater(confLocal, getLogSegmentMetadataStore(factory)),
-                executorService, bkc, confLocal.getBKDigestPW(), false, false);
+                scheduler, bkc, confLocal.getBKDigestPW(), false, false);
 
         Map<Long, LogSegmentMetadata> segments = getLogSegments(dlm);
         LOG.info("segments after drynrun {}", segments);
@@ -132,7 +137,7 @@ public class TestDLCK extends TestDistributedLogBase {
         bkc = getBookKeeperClient(factory);
         DistributedLogAdmin.checkAndRepairDLNamespace(uri, factory,
                 LogSegmentMetadataStoreUpdater.createMetadataUpdater(confLocal, getLogSegmentMetadataStore(factory)),
-                executorService, bkc, confLocal.getBKDigestPW(), false, false);
+                scheduler, bkc, confLocal.getBKDigestPW(), false, false);
 
         segments = getLogSegments(dlm);
         LOG.info("segments after repair {}", segments);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java
index 88840a0..4cf86fa 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java
@@ -91,7 +91,7 @@ public class TestBKLogSegmentEntryReader extends TestDistributedLogBase {
                                               DistributedLogConfiguration conf)
             throws Exception {
         LogSegmentEntryStore store = new BKLogSegmentEntryStore(
-                conf, bkc.get(), scheduler, NullStatsLogger.INSTANCE, AsyncFailureInjector.NULL);
+                conf, bkc, scheduler, NullStatsLogger.INSTANCE, AsyncFailureInjector.NULL);
         return (BKLogSegmentEntryReader) FutureUtils.result(store.openReader(segment, startEntryId));
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/DistributedLogInputFormat.java
----------------------------------------------------------------------
diff --git a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/DistributedLogInputFormat.java b/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/DistributedLogInputFormat.java
index 645b666..20c81f3 100644
--- a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/DistributedLogInputFormat.java
+++ b/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/DistributedLogInputFormat.java
@@ -95,7 +95,7 @@ public class DistributedLogInputFormat
         final AtomicReference<LedgerMetadata> metadataHolder = new AtomicReference<LedgerMetadata>(null);
         for (LogSegmentMetadata segment : segments) {
             final CountDownLatch latch = new CountDownLatch(1);
-            lm.readLedgerMetadata(segment.getLedgerId(),
+            lm.readLedgerMetadata(segment.getLogSegmentId(),
                     new BookkeeperInternalCallbacks.GenericCallback<LedgerMetadata>() {
                 @Override
                 public void operationComplete(int rc, LedgerMetadata ledgerMetadata) {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/LogSegmentReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/LogSegmentReader.java b/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/LogSegmentReader.java
index 6dfc6aa..f8b98f7 100644
--- a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/LogSegmentReader.java
+++ b/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/LogSegmentReader.java
@@ -56,7 +56,7 @@ class LogSegmentReader extends RecordReader<DLSN, LogRecordWithDLSN> {
         this.metadata = split.getMetadata();
         try {
             this.lh = bk.openLedgerNoRecovery(
-                    split.getLedgerId(),
+                    split.getLogSegmentId(),
                     BookKeeper.DigestType.CRC32,
                     conf.getBKDigestPW().getBytes(UTF_8));
         } catch (BKException e) {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/LogSegmentSplit.java
----------------------------------------------------------------------
diff --git a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/LogSegmentSplit.java b/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/LogSegmentSplit.java
index 58f3e9d..89e9d44 100644
--- a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/LogSegmentSplit.java
+++ b/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/LogSegmentSplit.java
@@ -53,8 +53,8 @@ public class LogSegmentSplit extends InputSplit implements Writable {
         return logSegmentMetadata;
     }
 
-    public long getLedgerId() {
-        return logSegmentMetadata.getLedgerId();
+    public long getLogSegmentId() {
+        return logSegmentMetadata.getLogSegmentId();
     }
 
     @Override