You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2016/12/30 00:07:33 UTC
[19/31] incubator-distributedlog git commit: DL-162: Use log segment
entry store interface
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDLCK.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDLCK.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDLCK.java
index a7dead4..d1069c3 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDLCK.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/admin/TestDLCK.java
@@ -28,6 +28,7 @@ import com.twitter.distributedlog.TestZooKeeperClientBuilder;
import com.twitter.distributedlog.ZooKeeperClient;
import com.twitter.distributedlog.metadata.DryrunLogSegmentMetadataStoreUpdater;
import com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
+import com.twitter.distributedlog.util.OrderedScheduler;
import com.twitter.distributedlog.util.SchedulerUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
@@ -104,6 +105,10 @@ public class TestDLCK extends TestDistributedLogBase {
zkc.get().create(uri.getPath(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
com.twitter.distributedlog.DistributedLogManagerFactory factory =
new com.twitter.distributedlog.DistributedLogManagerFactory(confLocal, uri);
+ OrderedScheduler scheduler = OrderedScheduler.newBuilder()
+ .name("dlck-tool")
+ .corePoolSize(1)
+ .build();
ExecutorService executorService = Executors.newCachedThreadPool();
String streamName = "check-and-repair-dl-namespace";
@@ -119,7 +124,7 @@ public class TestDLCK extends TestDistributedLogBase {
BookKeeperClient bkc = getBookKeeperClient(factory);
DistributedLogAdmin.checkAndRepairDLNamespace(uri, factory,
new DryrunLogSegmentMetadataStoreUpdater(confLocal, getLogSegmentMetadataStore(factory)),
- executorService, bkc, confLocal.getBKDigestPW(), false, false);
+ scheduler, bkc, confLocal.getBKDigestPW(), false, false);
Map<Long, LogSegmentMetadata> segments = getLogSegments(dlm);
LOG.info("segments after drynrun {}", segments);
@@ -132,7 +137,7 @@ public class TestDLCK extends TestDistributedLogBase {
bkc = getBookKeeperClient(factory);
DistributedLogAdmin.checkAndRepairDLNamespace(uri, factory,
LogSegmentMetadataStoreUpdater.createMetadataUpdater(confLocal, getLogSegmentMetadataStore(factory)),
- executorService, bkc, confLocal.getBKDigestPW(), false, false);
+ scheduler, bkc, confLocal.getBKDigestPW(), false, false);
segments = getLogSegments(dlm);
LOG.info("segments after repair {}", segments);
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java
index 88840a0..4cf86fa 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/logsegment/TestBKLogSegmentEntryReader.java
@@ -91,7 +91,7 @@ public class TestBKLogSegmentEntryReader extends TestDistributedLogBase {
DistributedLogConfiguration conf)
throws Exception {
LogSegmentEntryStore store = new BKLogSegmentEntryStore(
- conf, bkc.get(), scheduler, NullStatsLogger.INSTANCE, AsyncFailureInjector.NULL);
+ conf, bkc, scheduler, NullStatsLogger.INSTANCE, AsyncFailureInjector.NULL);
return (BKLogSegmentEntryReader) FutureUtils.result(store.openReader(segment, startEntryId));
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/DistributedLogInputFormat.java
----------------------------------------------------------------------
diff --git a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/DistributedLogInputFormat.java b/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/DistributedLogInputFormat.java
index 645b666..20c81f3 100644
--- a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/DistributedLogInputFormat.java
+++ b/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/DistributedLogInputFormat.java
@@ -95,7 +95,7 @@ public class DistributedLogInputFormat
final AtomicReference<LedgerMetadata> metadataHolder = new AtomicReference<LedgerMetadata>(null);
for (LogSegmentMetadata segment : segments) {
final CountDownLatch latch = new CountDownLatch(1);
- lm.readLedgerMetadata(segment.getLedgerId(),
+ lm.readLedgerMetadata(segment.getLogSegmentId(),
new BookkeeperInternalCallbacks.GenericCallback<LedgerMetadata>() {
@Override
public void operationComplete(int rc, LedgerMetadata ledgerMetadata) {
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/LogSegmentReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/LogSegmentReader.java b/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/LogSegmentReader.java
index 6dfc6aa..f8b98f7 100644
--- a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/LogSegmentReader.java
+++ b/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/LogSegmentReader.java
@@ -56,7 +56,7 @@ class LogSegmentReader extends RecordReader<DLSN, LogRecordWithDLSN> {
this.metadata = split.getMetadata();
try {
this.lh = bk.openLedgerNoRecovery(
- split.getLedgerId(),
+ split.getLogSegmentId(),
BookKeeper.DigestType.CRC32,
conf.getBKDigestPW().getBytes(UTF_8));
} catch (BKException e) {
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d871e657/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/LogSegmentSplit.java
----------------------------------------------------------------------
diff --git a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/LogSegmentSplit.java b/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/LogSegmentSplit.java
index 58f3e9d..89e9d44 100644
--- a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/LogSegmentSplit.java
+++ b/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/com/twitter/distributedlog/mapreduce/LogSegmentSplit.java
@@ -53,8 +53,8 @@ public class LogSegmentSplit extends InputSplit implements Writable {
return logSegmentMetadata;
}
- public long getLedgerId() {
- return logSegmentMetadata.getLedgerId();
+ public long getLogSegmentId() {
+ return logSegmentMetadata.getLogSegmentId();
}
@Override