You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by si...@apache.org on 2012/07/26 05:22:35 UTC

svn commit: r1365873 - in /zookeeper/bookkeeper/trunk: ./ hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/ hedwig-client/src/main/java/org/apache/hedwig/client/netty/ hedwig-server/src/main/java/org/apache/hedwig/admin/console/ hedwig-se...

Author: sijie
Date: Thu Jul 26 03:22:34 2012
New Revision: 1365873

URL: http://svn.apache.org/viewvc?rev=1365873&view=rev
Log:
BOOKKEEPER-330: System.currentTimeMillis usage in Hedwig (uma via sijie)

Added:
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestServerStats.java   (with props)
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkUtils.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/AbstractBenchmark.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookieBenchmark.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookkeeperBenchmark.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/PublishHandler.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/ServerStats.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1365873&r1=1365872&r2=1365873&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu Jul 26 03:22:34 2012
@@ -52,6 +52,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-302: No more messages delivered when hub server scans messages over two ledgers. (sijie via ivank)
 
+        BOOKKEEPER-330: System.currentTimeMillis usage in Hedwig (uma via sijie)
+
     IMPROVEMENTS:
 
       bookkeeper-server:

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java?rev=1365873&r1=1365872&r2=1365873&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkPublisher.java Thu Jul 26 03:22:34 2012
@@ -18,6 +18,8 @@
 package org.apache.hedwig.client.benchmark;
 
 import com.google.protobuf.ByteString;
+
+import org.apache.bookkeeper.util.MathUtils;
 import org.apache.hedwig.client.api.MessageHandler;
 import org.apache.hedwig.client.api.Publisher;
 import org.apache.hedwig.client.api.Subscriber;
@@ -98,7 +100,7 @@ public class BenchmarkPublisher extends 
             myPublishCount++;
         }
 
-        long startTime = System.currentTimeMillis();
+        long startTime = MathUtils.now();
         int myPublishLimit = numMessages / numRegions / numPartitions - myPublishCount;
         myPublishCount = 0;
         ThroughputLatencyAggregator agg = new ThroughputLatencyAggregator("acked pubs", myPublishLimit, nParallel);
@@ -117,7 +119,7 @@ public class BenchmarkPublisher extends 
             ByteString topic = ByteString.copyFromUtf8(HedwigBenchmark.TOPIC_PREFIX + topicNum);
 
             if (rate > 0) {
-                long delay = startTime + (long) (1000 * myPublishCount / rate) - System.currentTimeMillis();
+                long delay = startTime + (long) (1000 * myPublishCount / rate) - MathUtils.now();
                 if (delay > 0)
                     Thread.sleep(delay);
             }

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java?rev=1365873&r1=1365872&r2=1365873&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkSubscriber.java Thu Jul 26 03:22:34 2012
@@ -25,6 +25,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.protobuf.ByteString;
+
+import org.apache.bookkeeper.util.MathUtils;
 import org.apache.hedwig.client.api.MessageHandler;
 import org.apache.hedwig.client.api.Subscriber;
 import org.apache.hedwig.client.benchmark.BenchmarkUtils.BenchmarkCallback;
@@ -120,7 +122,7 @@ public class BenchmarkSubscriber extends
 
     void multiSub(String label, String topicPrefix, int start, final int npar, final int count)
             throws InterruptedException {
-        long startTime = System.currentTimeMillis();
+        long startTime = MathUtils.now();
         ThroughputLatencyAggregator agg = new ThroughputLatencyAggregator(label, count / numPartitions, npar);
         agg.startProgress();
 

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkUtils.java?rev=1365873&r1=1365872&r2=1365873&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkUtils.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/benchmark/BenchmarkUtils.java Thu Jul 26 03:22:34 2012
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.Atomi
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.bookkeeper.util.MathUtils;
 import org.apache.hedwig.exceptions.PubSubException;
 import org.apache.hedwig.util.Callback;
 
@@ -32,7 +33,7 @@ public class BenchmarkUtils {
     static final Logger logger = LoggerFactory.getLogger(BenchmarkUtils.class);
 
     public static double calcTp(final int count, long startTime) {
-        return 1000. * count / (System.currentTimeMillis() - startTime);
+        return 1000. * count / (MathUtils.now() - startTime);
     }
 
     /**
@@ -142,7 +143,7 @@ public class BenchmarkUtils {
 
         public void ding(boolean failed) {
             int snapDone = done.incrementAndGet();
-            earliest.compareAndSet(0, System.currentTimeMillis());
+            earliest.compareAndSet(0, MathUtils.now());
             if (failed)
                 numFailed.incrementAndGet();
             if (logger.isDebugEnabled())
@@ -167,11 +168,11 @@ public class BenchmarkUtils {
             this.agg = agg;
             agg.outstanding.acquire();
             // Must set the start time *after* taking acquiring on outstanding.
-            startTime = System.currentTimeMillis();
+            startTime = MathUtils.now();
         }
 
         private void finish(boolean failed) {
-            agg.reportLatency(System.currentTimeMillis() - startTime);
+            agg.reportLatency(MathUtils.now() - startTime);
             agg.tpAgg.ding(failed);
             agg.outstanding.release();
         }

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java?rev=1365873&r1=1365872&r2=1365873&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java Thu Jul 26 03:22:34 2012
@@ -36,6 +36,8 @@ import org.jboss.netty.channel.ChannelFu
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 
 import com.google.protobuf.ByteString;
+
+import org.apache.bookkeeper.util.MathUtils;
 import org.apache.hedwig.client.api.Client;
 import org.apache.hedwig.client.conf.ClientConfiguration;
 import org.apache.hedwig.client.data.PubSubData;
@@ -168,7 +170,7 @@ public class HedwigClientImpl implements
                 logger.debug("Running the PubSubRequest Timeout Task");
             // Loop through all outstanding PubSubData requests and check if
             // the requestWriteTime has timed out compared to the current time.
-            long curTime = System.currentTimeMillis();
+            long curTime = MathUtils.now();
             long timeoutInterval = cfg.getServerAckResponseTimeout();
 
             // First check the ResponseHandlers associated with cached

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java?rev=1365873&r1=1365872&r2=1365873&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java Thu Jul 26 03:22:34 2012
@@ -27,6 +27,8 @@ import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFuture;
 
 import com.google.protobuf.ByteString;
+
+import org.apache.bookkeeper.util.MathUtils;
 import org.apache.hedwig.client.api.Publisher;
 import org.apache.hedwig.client.conf.ClientConfiguration;
 import org.apache.hedwig.client.data.PubSubData;
@@ -175,7 +177,7 @@ public class HedwigPublisher implements 
 
         // Update the PubSubData with the txnId and the requestWriteTime
         pubSubData.txnId = txnId;
-        pubSubData.requestWriteTime = System.currentTimeMillis();
+        pubSubData.requestWriteTime = MathUtils.now();
 
         // Before we do the write, store this information into the
         // ResponseHandler so when the server responds, we know what

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java?rev=1365873&r1=1365872&r2=1365873&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java Thu Jul 26 03:22:34 2012
@@ -29,6 +29,8 @@ import org.jboss.netty.channel.ChannelFu
 import org.jboss.netty.channel.ChannelFutureListener;
 
 import com.google.protobuf.ByteString;
+
+import org.apache.bookkeeper.util.MathUtils;
 import org.apache.hedwig.client.api.MessageHandler;
 import org.apache.hedwig.client.api.Subscriber;
 import org.apache.hedwig.client.conf.ClientConfiguration;
@@ -395,7 +397,7 @@ public class HedwigSubscriber implements
 
         // Update the PubSubData with the txnId and the requestWriteTime
         pubSubData.txnId = txnId;
-        pubSubData.requestWriteTime = System.currentTimeMillis();
+        pubSubData.requestWriteTime = MathUtils.now();
 
         // Before we do the write, store this information into the
         // ResponseHandler so when the server responds, we know what

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java?rev=1365873&r1=1365872&r2=1365873&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/console/HedwigConsole.java Thu Jul 26 03:22:34 2012
@@ -37,6 +37,7 @@ import java.util.NoSuchElementException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.bookkeeper.util.MathUtils;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.hedwig.admin.HedwigAdmin;
 import org.apache.hedwig.client.api.MessageHandler;
@@ -422,7 +423,7 @@ public class HedwigConsole {
             if (args.length < 5) {
                 return false;
             }
-            final long startTime = System.currentTimeMillis();
+            final long startTime = MathUtils.now();
 
             final ByteString topic = ByteString.copyFromUtf8(args[1]);
             final ByteString subId = ByteString.copyFromUtf8(args[2] + "-" + startTime);
@@ -481,7 +482,7 @@ public class HedwigConsole {
 
                 // wait for the message
                 success = isDone.await(timeoutSecs, TimeUnit.SECONDS);
-                elapsedTime = System.currentTimeMillis() - startTime;
+                elapsedTime = MathUtils.now() - startTime;
             } finally {
                 try {
                     if (subscribed) {
@@ -879,7 +880,7 @@ public class HedwigConsole {
             return false;
         }
 
-        long startTime = System.currentTimeMillis();
+        long startTime = MathUtils.now();
         boolean success = false;
         try {
             success = myCommand.runCmd(args);
@@ -887,7 +888,7 @@ public class HedwigConsole {
             e.printStackTrace();
             success = false;
         }
-        long elapsedTime = System.currentTimeMillis() - startTime;
+        long elapsedTime = MathUtils.now() - startTime;
         if (inConsole) {
             if (success) {
                 System.out.println("Finished " + ((double)elapsedTime / 1000) + " s.");

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/AbstractBenchmark.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/AbstractBenchmark.java?rev=1365873&r1=1365872&r2=1365873&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/AbstractBenchmark.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/AbstractBenchmark.java Thu Jul 26 03:22:34 2012
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.Atomi
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.bookkeeper.util.MathUtils;
 import org.apache.hedwig.util.ConcurrencyUtils;
 
 public abstract class AbstractBenchmark {
@@ -57,7 +58,7 @@ public abstract class AbstractBenchmark 
                 return;
             }
 
-            totalLatency.addAndGet(System.currentTimeMillis() - (Long)ctx);
+            totalLatency.addAndGet(MathUtils.now() - (Long)ctx);
             int numDoneInt = numDone.incrementAndGet();
 
             if (logging && numDoneInt % 10000 == 0) {
@@ -71,7 +72,7 @@ public abstract class AbstractBenchmark 
     }
 
     public void runPhase(String phase, int numOps) throws Exception {
-        long startTime = System.currentTimeMillis();
+        long startTime = MathUtils.now();
 
         doOps(numOps);
 
@@ -79,7 +80,7 @@ public abstract class AbstractBenchmark 
             logger.error("One or more operations failed in phase: " + phase);
             throw new RuntimeException();
         } else {
-            logger.info("Phase: " + phase + " Avg latency : " + totalLatency.get() / numOps + ", tput = " + (numOps * 1000/ (System.currentTimeMillis() - startTime)));
+            logger.info("Phase: " + phase + " Avg latency : " + totalLatency.get() / numOps + ", tput = " + (numOps * 1000/ (MathUtils.now() - startTime)));
         }
     }
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookieBenchmark.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookieBenchmark.java?rev=1365873&r1=1365872&r2=1365873&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookieBenchmark.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookieBenchmark.java Thu Jul 26 03:22:34 2012
@@ -27,6 +27,7 @@ import org.apache.bookkeeper.proto.Booki
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
@@ -83,7 +84,7 @@ public class BookieBenchmark extends Abs
             buffer.put(passwd);
             buffer.rewind();
             ChannelBuffer toSend = ChannelBuffers.wrappedBuffer(ChannelBuffers.wrappedBuffer(buffer.slice()), ChannelBuffers.wrappedBuffer(data));
-            bkc.addEntry(addr, ledgerId, passwd, i, toSend, callback, System.currentTimeMillis(), 0);
+            bkc.addEntry(addr, ledgerId, passwd, i, toSend, callback, MathUtils.now(), 0);
         }
 
     }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookkeeperBenchmark.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookkeeperBenchmark.java?rev=1365873&r1=1365872&r2=1365873&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookkeeperBenchmark.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/benchmark/BookkeeperBenchmark.java Thu Jul 26 03:22:34 2012
@@ -24,6 +24,7 @@ import org.apache.bookkeeper.client.Book
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.util.MathUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,7 +74,7 @@ public class BookkeeperBenchmark extends
 
         for (int i=0; i<numOps; i++) {
             outstanding.acquire();
-            lh[rand.nextInt(lh.length)].asyncAddEntry(msg, callback, System.currentTimeMillis());
+            lh[rand.nextInt(lh.length)].asyncAddEntry(msg, callback, MathUtils.now());
         }
 
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java?rev=1365873&r1=1365872&r2=1365873&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java Thu Jul 26 03:22:34 2012
@@ -33,6 +33,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.protobuf.ByteString;
+
+import org.apache.bookkeeper.util.MathUtils;
 import org.apache.hedwig.client.data.TopicSubscriber;
 import org.apache.hedwig.protocol.PubSubProtocol.Message;
 import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
@@ -173,7 +175,7 @@ public class FIFODeliveryManager impleme
 
     public void retryErroredSubscriberAfterDelay(ActiveSubscriberState subscriber) {
 
-        subscriber.setLastScanErrorTime(System.currentTimeMillis());
+        subscriber.setLastScanErrorTime(MathUtils.now());
 
         if (!retryQueue.offer(subscriber)) {
             throw new UnexpectedError("Could not enqueue to delivery manager retry queue");
@@ -228,7 +230,7 @@ public class FIFODeliveryManager impleme
     }
 
     protected void retryErroredSubscribers() {
-        long lastInterestingFailureTime = System.currentTimeMillis() - cfg.getScanBackoffPeriodMs();
+        long lastInterestingFailureTime = MathUtils.now() - cfg.getScanBackoffPeriodMs();
         ActiveSubscriberState subscriber;
 
         while ((subscriber = retryQueue.peek()) != null) {

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/PublishHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/PublishHandler.java?rev=1365873&r1=1365872&r2=1365873&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/PublishHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/PublishHandler.java Thu Jul 26 03:22:34 2012
@@ -18,6 +18,7 @@
 package org.apache.hedwig.server.handlers;
 
 import org.jboss.netty.channel.Channel;
+import org.apache.bookkeeper.util.MathUtils;
 import org.apache.hedwig.exceptions.PubSubException;
 import org.apache.hedwig.protocol.PubSubProtocol.Message;
 import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
@@ -55,7 +56,7 @@ public class PublishHandler extends Base
         Message msgToSerialize = Message.newBuilder(request.getPublishRequest().getMsg()).setSrcRegion(
                                      cfg.getMyRegionByteString()).build();
 
-        final long requestTime = System.currentTimeMillis();
+        final long requestTime = MathUtils.now();
         PersistRequest persistRequest = new PersistRequest(request.getTopic(), msgToSerialize,
         new Callback<Long>() {
             @Override
@@ -67,7 +68,7 @@ public class PublishHandler extends Base
             @Override
             public void operationFinished(Object ctx, Long resultOfOperation) {
                 channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
-                pubStats.updateLatency(System.currentTimeMillis() - requestTime);
+                pubStats.updateLatency(MathUtils.now() - requestTime);
             }
         }, null);
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java?rev=1365873&r1=1365872&r2=1365873&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java Thu Jul 26 03:22:34 2012
@@ -25,6 +25,8 @@ import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFutureListener;
 
 import com.google.protobuf.ByteString;
+
+import org.apache.bookkeeper.util.MathUtils;
 import org.apache.hedwig.client.data.TopicSubscriber;
 import org.apache.hedwig.exceptions.PubSubException;
 import org.apache.hedwig.exceptions.PubSubException.ServerNotResponsibleForTopicException;
@@ -108,7 +110,7 @@ public class SubscribeHandler extends Ba
 
         MessageSeqId lastSeqIdPublished = MessageSeqId.newBuilder(seqId).setLocalComponent(seqId.getLocalComponent()).build();
 
-        final long requestTime = System.currentTimeMillis();
+        final long requestTime = MathUtils.now();
         subMgr.serveSubscribeRequest(topic, subRequest, lastSeqIdPublished, new Callback<MessageSeqId>() {
 
             @Override
@@ -152,7 +154,7 @@ public class SubscribeHandler extends Ba
                 // otherwise the first message might go out before the response
                 // to the subscribe
                 channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
-                subStats.updateLatency(System.currentTimeMillis() - requestTime);
+                subStats.updateLatency(MathUtils.now() - requestTime);
 
                 // want to start 1 ahead of the consume ptr
                 MessageSeqId seqIdToStartFrom = MessageSeqId.newBuilder(resultOfOperation).setLocalComponent(

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java?rev=1365873&r1=1365872&r2=1365873&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java Thu Jul 26 03:22:34 2012
@@ -19,6 +19,8 @@ package org.apache.hedwig.server.handler
 
 import org.jboss.netty.channel.Channel;
 import com.google.protobuf.ByteString;
+
+import org.apache.bookkeeper.util.MathUtils;
 import org.apache.hedwig.exceptions.PubSubException;
 import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
 import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
@@ -60,7 +62,7 @@ public class UnsubscribeHandler extends 
         final ByteString topic = request.getTopic();
         final ByteString subscriberId = unsubRequest.getSubscriberId();
 
-        final long requestTime = System.currentTimeMillis();
+        final long requestTime = MathUtils.now();
         subMgr.unsubscribe(topic, subscriberId, new Callback<Void>() {
             @Override
             public void operationFailed(Object ctx, PubSubException exception) {
@@ -72,7 +74,7 @@ public class UnsubscribeHandler extends 
             public void operationFinished(Object ctx, Void resultOfOperation) {
                 deliveryMgr.stopServingSubscriber(topic, subscriberId);
                 channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
-                unsubStats.updateLatency(System.currentTimeMillis() - requestTime);
+                unsubStats.updateLatency(MathUtils.now() - requestTime);
             }
         }, null);
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/ServerStats.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/ServerStats.java?rev=1365873&r1=1365872&r2=1365873&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/ServerStats.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/ServerStats.java Thu Jul 26 03:22:34 2012
@@ -21,11 +21,14 @@ import java.beans.ConstructorProperties;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Server Stats
  */
 public class ServerStats {
+    private static final Logger LOG = LoggerFactory.getLogger(ServerStats.class);
     static ServerStats instance = new ServerStats();
 
     /**
@@ -100,6 +103,14 @@ public class ServerStats {
          * Update Latency
          */
         synchronized public void updateLatency(long latency) {
+            if (latency < 0) {
+                // less than 0ms . Ideally this should not happen.
+                // We have seen this latency negative in some cases due to the
+                // behaviors of JVM. Ignoring the statistics updation for such
+                // cases.
+                LOG.warn("Latency time coming negative");
+                return;
+            }
             totalLatency += latency;
             ++numSuccessOps;
             if (latency < minLatency) {

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java?rev=1365873&r1=1365872&r2=1365873&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java Thu Jul 26 03:22:34 2012
@@ -35,6 +35,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.protobuf.ByteString;
+
+import org.apache.bookkeeper.util.MathUtils;
 import org.apache.hedwig.exceptions.PubSubException;
 import org.apache.hedwig.exceptions.PubSubException.ServerNotResponsibleForTopicException;
 import org.apache.hedwig.protocol.PubSubProtocol.Message;
@@ -628,7 +630,7 @@ public class ReadAheadCache implements P
         }
 
         public void performRequest() {
-            addMessageToCache(cacheKey, message, System.currentTimeMillis());
+            addMessageToCache(cacheKey, message, MathUtils.now());
         }
 
     }

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestServerStats.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestServerStats.java?rev=1365873&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestServerStats.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestServerStats.java Thu Jul 26 03:22:34 2012
@@ -0,0 +1,26 @@
+package org.apache.hedwig.server.netty;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import org.junit.Test;
+
+/** Tests that Statistics updation in hedwig Server */
+public class TestServerStats {
+
+    /**
+     * Tests that updatLatency should not fail with
+     * ArrayIndexOutOfBoundException when latency time coming as negative.
+     */
+    @Test
+    public void testUpdateLatencyShouldNotFailWithAIOBEWithNegativeLatency()
+            throws Exception {
+        ServerStats stats = ServerStats.getInstance();
+        org.apache.hedwig.server.netty.ServerStats.OpStats opStat = stats
+                .getOpStats(OperationType.SUBSCRIBE);
+        opStat.updateLatency(-10);
+        assertEquals("Should not update any latency metrics", 0,
+                opStat.numSuccessOps);
+
+    }
+}

Propchange: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/netty/TestServerStats.java
------------------------------------------------------------------------------
    svn:eol-style = native