You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2015/03/11 20:48:14 UTC
cxf git commit: Add total data read/written to metrics.
Repository: cxf
Updated Branches:
refs/heads/master e54e39740 -> fcc8bcb38
Add total data read/written to metrics.
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/fcc8bcb3
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/fcc8bcb3
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/fcc8bcb3
Branch: refs/heads/master
Commit: fcc8bcb38f33febf5c73c5f5f56d877272f27e20
Parents: e54e397
Author: Daniel Kulp <dk...@apache.org>
Authored: Wed Mar 11 15:47:46 2015 -0400
Committer: Daniel Kulp <dk...@apache.org>
Committed: Wed Mar 11 15:47:46 2015 -0400
----------------------------------------------------------------------
.../codahale/CountingInputStream.java | 76 ++++++++++++++++++++
.../codahale/CountingOutputStream.java | 49 +++++++++++++
.../apache/cxf/management/codahale/Metrics.java | 48 +++++++++++++
3 files changed, 173 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/fcc8bcb3/rt/management/src/main/java/org/apache/cxf/management/codahale/CountingInputStream.java
----------------------------------------------------------------------
diff --git a/rt/management/src/main/java/org/apache/cxf/management/codahale/CountingInputStream.java b/rt/management/src/main/java/org/apache/cxf/management/codahale/CountingInputStream.java
new file mode 100644
index 0000000..84d691d
--- /dev/null
+++ b/rt/management/src/main/java/org/apache/cxf/management/codahale/CountingInputStream.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.management.codahale;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+public final class CountingInputStream extends FilterInputStream {
+
+ private long count;
+ private long mark = -1;
+
+ public CountingInputStream(InputStream in) {
+ super(in);
+ }
+
+ public long getCount() {
+ return count;
+ }
+
+ public int read() throws IOException {
+ int result = in.read();
+ if (result != -1) {
+ count++;
+ }
+ return result;
+ }
+
+ public int read(byte[] b, int off, int len) throws IOException {
+ int result = in.read(b, off, len);
+ if (result != -1) {
+ count += result;
+ }
+ return result;
+ }
+
+ public long skip(long n) throws IOException {
+ long result = in.skip(n);
+ count += result;
+ return result;
+ }
+
+ public synchronized void mark(int readlimit) {
+ in.mark(readlimit);
+ mark = count;
+ }
+
+ public synchronized void reset() throws IOException {
+ if (!in.markSupported()) {
+ throw new IOException("Mark not supported");
+ }
+ if (mark == -1) {
+ throw new IOException("Mark not set");
+ }
+
+ in.reset();
+ count = mark;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cxf/blob/fcc8bcb3/rt/management/src/main/java/org/apache/cxf/management/codahale/CountingOutputStream.java
----------------------------------------------------------------------
diff --git a/rt/management/src/main/java/org/apache/cxf/management/codahale/CountingOutputStream.java b/rt/management/src/main/java/org/apache/cxf/management/codahale/CountingOutputStream.java
new file mode 100644
index 0000000..bc5cffd
--- /dev/null
+++ b/rt/management/src/main/java/org/apache/cxf/management/codahale/CountingOutputStream.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.management.codahale;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+public final class CountingOutputStream extends FilterOutputStream {
+ private long count;
+
+ public CountingOutputStream(OutputStream out) {
+ super(out);
+ }
+
+ public long getCount() {
+ return count;
+ }
+
+ public void write(byte[] b, int off, int len) throws IOException {
+ out.write(b, off, len);
+ count += len;
+ }
+
+ public void write(int b) throws IOException {
+ out.write(b);
+ count++;
+ }
+
+ public void close() throws IOException {
+ out.close();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cxf/blob/fcc8bcb3/rt/management/src/main/java/org/apache/cxf/management/codahale/Metrics.java
----------------------------------------------------------------------
diff --git a/rt/management/src/main/java/org/apache/cxf/management/codahale/Metrics.java b/rt/management/src/main/java/org/apache/cxf/management/codahale/Metrics.java
index 7bb5517..05830e1 100644
--- a/rt/management/src/main/java/org/apache/cxf/management/codahale/Metrics.java
+++ b/rt/management/src/main/java/org/apache/cxf/management/codahale/Metrics.java
@@ -22,6 +22,8 @@ package org.apache.cxf.management.codahale;
import java.io.Closeable;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.concurrent.TimeUnit;
import javax.management.MalformedObjectNameException;
@@ -29,6 +31,7 @@ import javax.management.ObjectName;
import com.codahale.metrics.Counter;
import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.ObjectNameFactory;
import com.codahale.metrics.Timer;
@@ -36,6 +39,7 @@ import com.codahale.metrics.Timer;
import org.apache.cxf.Bus;
import org.apache.cxf.endpoint.Endpoint;
import org.apache.cxf.interceptor.AttachmentInInterceptor;
+import org.apache.cxf.interceptor.AttachmentOutInterceptor;
import org.apache.cxf.interceptor.Fault;
import org.apache.cxf.interceptor.MessageSenderInterceptor;
import org.apache.cxf.interceptor.ServiceInvokerInterceptor;
@@ -95,11 +99,14 @@ public class Metrics {
ResponseTimeMessageInInterceptor in = new ResponseTimeMessageInInterceptor();
ResponseTimeMessageInOneWayInterceptor oneway = new ResponseTimeMessageInOneWayInterceptor();
ResponseTimeMessageOutInterceptor out = new ResponseTimeMessageOutInterceptor();
+ CountingOutInterceptor countingOut = new CountingOutInterceptor();
//ResponseTimeMessageInvokerInterceptor invoker = new ResponseTimeMessageInvokerInterceptor();
bus.getInInterceptors().add(in);
bus.getInInterceptors().add(oneway);
+ bus.getOutInterceptors().add(countingOut);
bus.getOutInterceptors().add(out);
+ bus.getOutFaultInterceptors().add(countingOut);
bus.getOutFaultInterceptors().add(out);
//bus.setExtension(this, CounterRepository.class);
@@ -115,6 +122,8 @@ public class Metrics {
Timer checkedApplicationFaults;
Timer runtimeFaults;
Timer logicalRuntimeFaults;
+ Meter incomingData;
+ Meter outgoingData;
Context start() {
inFlight.inc();
@@ -209,6 +218,8 @@ public class Metrics {
ti.runtimeFaults = registry.timer(baseName + "Attribute=Runtime Faults");
ti.logicalRuntimeFaults = registry.timer(baseName + "Attribute=Logical Runtime Faults");
ti.inFlight = registry.counter(baseName + "Attribute=In Flight");
+ ti.incomingData = registry.meter(baseName + "Attribute=Data Read");
+ ti.outgoingData = registry.meter(baseName + "Attribute=Data Written");
endpoint.put(TimerInfo.class.getName(), ti);
endpoint.addCleanupHook(new Closeable() {
public void close() throws IOException {
@@ -219,6 +230,8 @@ public class Metrics {
registry.remove(baseName + "Attribute=Runtime Faults");
registry.remove(baseName + "Attribute=Logical Runtime Faults");
registry.remove(baseName + "Attribute=In Flight");
+ registry.remove(baseName + "Attribute=Data Read");
+ registry.remove(baseName + "Attribute=Data Written");
endpoint.remove(TimerInfo.class.getName());
System.out.println(endpoint.getBinding().getBindingInfo().getOperations());
for (BindingOperationInfo boi : endpoint.getBinding().getBindingInfo().getOperations()) {
@@ -264,6 +277,15 @@ public class Metrics {
BindingOperationInfo bi = m.getExchange().getBindingOperationInfo();
FaultMode fm = m.getExchange().get(FaultMode.class);
TimerInfo op = null;
+ CountingInputStream in = m.getExchange().get(CountingInputStream.class);
+ if (in != null) {
+ ctx.info.incomingData.mark(in.getCount());
+ }
+ CountingOutputStream out = m.getExchange().get(CountingOutputStream.class);
+ if (out != null) {
+ ctx.info.outgoingData.mark(out.getCount());
+ }
+
if (bi != null) {
op = getTimerInfo(m, bi);
op.totals.update(l, TimeUnit.NANOSECONDS);
@@ -300,6 +322,12 @@ public class Metrics {
} else {
TimerInfo.Context ctx = ti.start();
message.getExchange().put(TimerInfo.Context.class, ctx);
+ InputStream in = message.getContent(InputStream.class);
+ if (in != null) {
+ CountingInputStream newIn = new CountingInputStream(in);
+ message.setContent(InputStream.class, newIn);
+ message.getExchange().put(CountingInputStream.class, newIn);
+ }
}
}
public void handleFault(Message message) {
@@ -308,6 +336,26 @@ public class Metrics {
}
}
};
+
+ class CountingOutInterceptor extends AbstractPhaseInterceptor<Message> {
+ public CountingOutInterceptor() {
+ super(Phase.PRE_STREAM);
+ addBefore(AttachmentOutInterceptor.class.getName());
+ }
+ public void handleMessage(Message message) throws Fault {
+ if (isRequestor(message)) {
+ //
+ } else {
+ OutputStream out = message.getContent(OutputStream.class);
+ if (out != null) {
+ CountingOutputStream newOut = new CountingOutputStream(out);
+ message.setContent(OutputStream.class, newOut);
+ message.getExchange().put(CountingOutputStream.class, newOut);
+ }
+
+ }
+ }
+ };
class ResponseTimeMessageOutInterceptor extends AbstractPhaseInterceptor<Message> {
public ResponseTimeMessageOutInterceptor() {