You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2015/03/11 20:48:14 UTC

cxf git commit: Add total data read/written to metrics.

Repository: cxf
Updated Branches:
  refs/heads/master e54e39740 -> fcc8bcb38


Add total data read/written to metrics.


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/fcc8bcb3
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/fcc8bcb3
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/fcc8bcb3

Branch: refs/heads/master
Commit: fcc8bcb38f33febf5c73c5f5f56d877272f27e20
Parents: e54e397
Author: Daniel Kulp <dk...@apache.org>
Authored: Wed Mar 11 15:47:46 2015 -0400
Committer: Daniel Kulp <dk...@apache.org>
Committed: Wed Mar 11 15:47:46 2015 -0400

----------------------------------------------------------------------
 .../codahale/CountingInputStream.java           | 76 ++++++++++++++++++++
 .../codahale/CountingOutputStream.java          | 49 +++++++++++++
 .../apache/cxf/management/codahale/Metrics.java | 48 +++++++++++++
 3 files changed, 173 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/fcc8bcb3/rt/management/src/main/java/org/apache/cxf/management/codahale/CountingInputStream.java
----------------------------------------------------------------------
diff --git a/rt/management/src/main/java/org/apache/cxf/management/codahale/CountingInputStream.java b/rt/management/src/main/java/org/apache/cxf/management/codahale/CountingInputStream.java
new file mode 100644
index 0000000..84d691d
--- /dev/null
+++ b/rt/management/src/main/java/org/apache/cxf/management/codahale/CountingInputStream.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.management.codahale;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+public final class CountingInputStream extends FilterInputStream {
+
+    private long count;
+    private long mark = -1;
+
+    public CountingInputStream(InputStream in) {
+        super(in);
+    }
+
+    public long getCount() {
+        return count;
+    }
+
+    public int read() throws IOException {
+        int result = in.read();
+        if (result != -1) {
+            count++;
+        }
+        return result;
+    }
+
+    public int read(byte[] b, int off, int len) throws IOException {
+        int result = in.read(b, off, len);
+        if (result != -1) {
+            count += result;
+        }
+        return result;
+    }
+
+    public long skip(long n) throws IOException {
+        long result = in.skip(n);
+        count += result;
+        return result;
+    }
+
+    public synchronized void mark(int readlimit) {
+        in.mark(readlimit);
+        mark = count;
+    }
+
+    public synchronized void reset() throws IOException {
+        if (!in.markSupported()) {
+            throw new IOException("Mark not supported");
+        }
+        if (mark == -1) {
+            throw new IOException("Mark not set");
+        }
+
+        in.reset();
+        count = mark;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cxf/blob/fcc8bcb3/rt/management/src/main/java/org/apache/cxf/management/codahale/CountingOutputStream.java
----------------------------------------------------------------------
diff --git a/rt/management/src/main/java/org/apache/cxf/management/codahale/CountingOutputStream.java b/rt/management/src/main/java/org/apache/cxf/management/codahale/CountingOutputStream.java
new file mode 100644
index 0000000..bc5cffd
--- /dev/null
+++ b/rt/management/src/main/java/org/apache/cxf/management/codahale/CountingOutputStream.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.management.codahale;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+public final class CountingOutputStream extends FilterOutputStream {
+    private long count;
+
+    public CountingOutputStream(OutputStream out) {
+      super(out);
+    }
+    
+    public long getCount() {
+        return count;
+    }
+
+    public void write(byte[] b, int off, int len) throws IOException {
+        out.write(b, off, len);
+        count += len;
+    }
+
+    public void write(int b) throws IOException {
+        out.write(b);
+        count++;
+    }
+
+    public void close() throws IOException {
+        out.close();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cxf/blob/fcc8bcb3/rt/management/src/main/java/org/apache/cxf/management/codahale/Metrics.java
----------------------------------------------------------------------
diff --git a/rt/management/src/main/java/org/apache/cxf/management/codahale/Metrics.java b/rt/management/src/main/java/org/apache/cxf/management/codahale/Metrics.java
index 7bb5517..05830e1 100644
--- a/rt/management/src/main/java/org/apache/cxf/management/codahale/Metrics.java
+++ b/rt/management/src/main/java/org/apache/cxf/management/codahale/Metrics.java
@@ -22,6 +22,8 @@ package org.apache.cxf.management.codahale;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.concurrent.TimeUnit;
 
 import javax.management.MalformedObjectNameException;
@@ -29,6 +31,7 @@ import javax.management.ObjectName;
 
 import com.codahale.metrics.Counter;
 import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.Meter;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.ObjectNameFactory;
 import com.codahale.metrics.Timer;
@@ -36,6 +39,7 @@ import com.codahale.metrics.Timer;
 import org.apache.cxf.Bus;
 import org.apache.cxf.endpoint.Endpoint;
 import org.apache.cxf.interceptor.AttachmentInInterceptor;
+import org.apache.cxf.interceptor.AttachmentOutInterceptor;
 import org.apache.cxf.interceptor.Fault;
 import org.apache.cxf.interceptor.MessageSenderInterceptor;
 import org.apache.cxf.interceptor.ServiceInvokerInterceptor;
@@ -95,11 +99,14 @@ public class Metrics {
         ResponseTimeMessageInInterceptor in = new ResponseTimeMessageInInterceptor();
         ResponseTimeMessageInOneWayInterceptor oneway = new ResponseTimeMessageInOneWayInterceptor();
         ResponseTimeMessageOutInterceptor out = new ResponseTimeMessageOutInterceptor();
+        CountingOutInterceptor countingOut = new CountingOutInterceptor();
         //ResponseTimeMessageInvokerInterceptor invoker = new ResponseTimeMessageInvokerInterceptor();
         
         bus.getInInterceptors().add(in);
         bus.getInInterceptors().add(oneway);
+        bus.getOutInterceptors().add(countingOut);
         bus.getOutInterceptors().add(out);
+        bus.getOutFaultInterceptors().add(countingOut);
         bus.getOutFaultInterceptors().add(out);
         //bus.setExtension(this, CounterRepository.class); 
         
@@ -115,6 +122,8 @@ public class Metrics {
         Timer checkedApplicationFaults;
         Timer runtimeFaults;
         Timer logicalRuntimeFaults;
+        Meter incomingData;
+        Meter outgoingData;
      
         Context start() {
             inFlight.inc();
@@ -209,6 +218,8 @@ public class Metrics {
             ti.runtimeFaults = registry.timer(baseName + "Attribute=Runtime Faults");
             ti.logicalRuntimeFaults = registry.timer(baseName + "Attribute=Logical Runtime Faults");
             ti.inFlight = registry.counter(baseName + "Attribute=In Flight");
+            ti.incomingData = registry.meter(baseName + "Attribute=Data Read");
+            ti.outgoingData = registry.meter(baseName + "Attribute=Data Written");
             endpoint.put(TimerInfo.class.getName(), ti);
             endpoint.addCleanupHook(new Closeable() {
                 public void close() throws IOException {
@@ -219,6 +230,8 @@ public class Metrics {
                         registry.remove(baseName + "Attribute=Runtime Faults");
                         registry.remove(baseName + "Attribute=Logical Runtime Faults");
                         registry.remove(baseName + "Attribute=In Flight");
+                        registry.remove(baseName + "Attribute=Data Read");
+                        registry.remove(baseName + "Attribute=Data Written");
                         endpoint.remove(TimerInfo.class.getName());
                         System.out.println(endpoint.getBinding().getBindingInfo().getOperations());
                         for (BindingOperationInfo boi : endpoint.getBinding().getBindingInfo().getOperations()) {
@@ -264,6 +277,15 @@ public class Metrics {
         BindingOperationInfo bi = m.getExchange().getBindingOperationInfo();
         FaultMode fm = m.getExchange().get(FaultMode.class);
         TimerInfo op = null;
+        CountingInputStream in = m.getExchange().get(CountingInputStream.class);
+        if (in != null) {
+            ctx.info.incomingData.mark(in.getCount());
+        }
+        CountingOutputStream out = m.getExchange().get(CountingOutputStream.class);
+        if (out != null) {
+            ctx.info.outgoingData.mark(out.getCount());
+        }
+
         if (bi != null) {
             op = getTimerInfo(m, bi);
             op.totals.update(l, TimeUnit.NANOSECONDS);
@@ -300,6 +322,12 @@ public class Metrics {
             } else {
                 TimerInfo.Context ctx = ti.start();
                 message.getExchange().put(TimerInfo.Context.class, ctx);
+                InputStream in = message.getContent(InputStream.class);
+                if (in != null) {
+                    CountingInputStream newIn = new CountingInputStream(in);
+                    message.setContent(InputStream.class, newIn);
+                    message.getExchange().put(CountingInputStream.class, newIn);
+                }
             }
         }
         public void handleFault(Message message) {
@@ -308,6 +336,26 @@ public class Metrics {
             }
         }
     };
+    
+    class CountingOutInterceptor extends AbstractPhaseInterceptor<Message> {
+        public CountingOutInterceptor() {
+            super(Phase.PRE_STREAM);
+            addBefore(AttachmentOutInterceptor.class.getName());
+        }
+        public void handleMessage(Message message) throws Fault {
+            if (isRequestor(message)) {
+                //
+            } else {
+                OutputStream out = message.getContent(OutputStream.class);
+                if (out != null) {
+                    CountingOutputStream newOut = new CountingOutputStream(out);
+                    message.setContent(OutputStream.class, newOut);
+                    message.getExchange().put(CountingOutputStream.class, newOut);
+                }
+               
+            }
+        }    
+    };
 
     class ResponseTimeMessageOutInterceptor extends AbstractPhaseInterceptor<Message> {
         public ResponseTimeMessageOutInterceptor() {