You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2016/12/28 01:05:14 UTC

[06/20] incubator-distributedlog git commit: DL-105: Make compression stats available per stream

DL-105: Make compression stats available per stream


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/a9cbb2c8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/a9cbb2c8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/a9cbb2c8

Branch: refs/heads/master
Commit: a9cbb2c844af8985ed249f423eccd1ebeaafb476
Parents: 316fd94
Author: Yiming Zang <yz...@twitter.com>
Authored: Wed Aug 10 16:18:53 2016 -0700
Committer: Sijie Guo <si...@twitter.com>
Committed: Tue Dec 27 16:49:27 2016 -0800

----------------------------------------------------------------------
 .../BKDistributedLogNamespace.java              | 20 ++++++++++++++------
 .../namespace/DistributedLogNamespace.java      |  5 ++++-
 .../distributedlog/TestAsyncReaderWriter.java   |  4 +++-
 .../service/stream/StreamImpl.java              |  3 ++-
 4 files changed, 23 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a9cbb2c8/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
index 0f2c222..281c637 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
@@ -512,7 +512,8 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace {
                 logName,
                 ClientSharingOption.SharedClients,
                 Optional.<DistributedLogConfiguration>absent(),
-                Optional.<DynamicDistributedLogConfiguration>absent());
+                Optional.<DynamicDistributedLogConfiguration>absent(),
+                Optional.<StatsLogger>absent());
         dlm.delete();
     }
 
@@ -521,13 +522,15 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace {
             throws InvalidStreamNameException, IOException {
         return openLog(logName,
                 Optional.<DistributedLogConfiguration>absent(),
-                Optional.<DynamicDistributedLogConfiguration>absent());
+                Optional.<DynamicDistributedLogConfiguration>absent(),
+                Optional.<StatsLogger>absent());
     }
 
     @Override
     public DistributedLogManager openLog(String logName,
                                          Optional<DistributedLogConfiguration> logConf,
-                                         Optional<DynamicDistributedLogConfiguration> dynamicLogConf)
+                                         Optional<DynamicDistributedLogConfiguration> dynamicLogConf,
+                                         Optional<StatsLogger> perStreamStatsLogger)
             throws InvalidStreamNameException, IOException {
         validateName(logName);
         Optional<URI> uri = FutureUtils.result(metadataStore.getLogLocation(logName));
@@ -539,7 +542,8 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace {
                 logName,
                 ClientSharingOption.SharedClients,
                 logConf,
-                dynamicLogConf);
+                dynamicLogConf,
+                perStreamStatsLogger);
     }
 
     @Override
@@ -780,7 +784,8 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace {
                 nameOfLogStream,
                 clientSharingOption,
                 logConfiguration,
-                dynamicLogConfiguration
+                dynamicLogConfiguration,
+                Optional.<StatsLogger>absent()
         );
     }
 
@@ -806,7 +811,8 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace {
             String nameOfLogStream,
             ClientSharingOption clientSharingOption,
             Optional<DistributedLogConfiguration> logConfiguration,
-            Optional<DynamicDistributedLogConfiguration> dynamicLogConfiguration)
+            Optional<DynamicDistributedLogConfiguration> dynamicLogConfiguration,
+            Optional<StatsLogger> perStreamStatsLogger)
         throws InvalidStreamNameException, IOException {
         // Make sure the name is well formed
         validateName(nameOfLogStream);
@@ -872,6 +878,8 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace {
             dlmLedgerAlloctor = this.allocator;
             dlmLogSegmentRollingPermitManager = this.logSegmentRollingPermitManager;
         }
+        // if there's a specified perStreamStatsLogger, user it, otherwise use the default one.
+        StatsLogger perLogStatsLogger = perStreamStatsLogger.or(this.perLogStatsLogger);
 
         return new BKDistributedLogManager(
                 nameOfLogStream,                    /* Log Name */

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a9cbb2c8/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java
index d42b5f2..b5abe9f 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java
@@ -30,6 +30,8 @@ import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.bookkeeper.stats.StatsLogger;
+
 /**
  * A namespace is the basic unit for managing a set of distributedlogs.
  *
@@ -128,7 +130,8 @@ public interface DistributedLogNamespace {
      */
     DistributedLogManager openLog(String logName,
                                   Optional<DistributedLogConfiguration> logConf,
-                                  Optional<DynamicDistributedLogConfiguration> dynamicLogConf)
+                                  Optional<DynamicDistributedLogConfiguration> dynamicLogConf,
+                                  Optional<StatsLogger> perStreamStatsLogger)
             throws InvalidStreamNameException, IOException;
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a9cbb2c8/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
index 0c7f346..e5063cc 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
@@ -45,6 +45,7 @@ import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.feature.FixedValueFeature;
 import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
@@ -1979,7 +1980,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
         dlm = namespace.openLog(
                 name + "-custom",
                 Optional.<DistributedLogConfiguration>absent(),
-                Optional.of(dynConf));
+                Optional.of(dynConf),
+                Optional.<StatsLogger>absent());
         writer = dlm.startAsyncLogSegmentNonPartitioned();
         FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
         segments = dlm.getLogSegments();

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/a9cbb2c8/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
index 3d5b9e7..e74ebbe 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
@@ -231,7 +231,8 @@ public class StreamImpl implements Stream {
     private DistributedLogManager openLog(String name) throws IOException {
         Optional<DistributedLogConfiguration> dlConf = Optional.<DistributedLogConfiguration>absent();
         Optional<DynamicDistributedLogConfiguration> dynDlConf = Optional.of(dynConf);
-        return dlNamespace.openLog(name, dlConf, dynDlConf);
+        Optional<StatsLogger> perStreamStatsLogger = Optional.of(streamLogger);
+        return dlNamespace.openLog(name, dlConf, dynDlConf, perStreamStatsLogger);
     }
 
     // Expensive initialization, only called once per stream.