You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by re...@apache.org on 2014/10/22 02:03:23 UTC
[01/10] git commit: Created MXBean for counts and depricated
DatumStatusCounter and DatumStatusCountable in core
Repository: incubator-streams
Updated Branches:
refs/heads/master 849a2e8f9 -> 9b3227141
Created MXBean for counts and depricated DatumStatusCounter and DatumStatusCountable in core
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c030d439
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c030d439
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c030d439
Branch: refs/heads/master
Commit: c030d4393ab84fbd1cf484bf8ea06a7902a26964
Parents: 2a635df
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Fri Oct 17 17:13:51 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Fri Oct 17 17:13:51 2014 -0500
----------------------------------------------------------------------
.../streams/core/DatumStatusCountable.java | 1 +
.../apache/streams/core/DatumStatusCounter.java | 2 +-
.../local/counters/DatumStatusCounter.java | 88 +++++++++++
.../counters/DatumStatusCounterMXBean.java | 43 +++++
.../local/counters/StreamsTaskCounter.java | 101 ++++++++++++
.../counters/StreamsTaskCounterMXBean.java | 53 +++++++
...amOnUnhandleThrowableThreadPoolExecutor.java | 17 ++
.../local/counters/DatumStatusCounterTest.java | 134 ++++++++++++++++
.../local/counters/StreamsTaskCounterTest.java | 158 +++++++++++++++++++
9 files changed, 596 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c030d439/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java
index 6a9da4d..3abf30d 100644
--- a/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java
+++ b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java
@@ -21,6 +21,7 @@ package org.apache.streams.core;
/**
* Created by steveblackmon on 3/24/14.
*/
+@Deprecated
public interface DatumStatusCountable {
public DatumStatusCounter getDatumStatusCounter();
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c030d439/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java
index 1bba688..2758ada 100644
--- a/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java
+++ b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java
@@ -17,7 +17,7 @@
*/
package org.apache.streams.core;
-
+@Deprecated
public class DatumStatusCounter
{
private volatile int attempted = 0;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c030d439/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java
new file mode 100644
index 0000000..88c3a6f
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.local.counters;
+
+import net.jcip.annotations.ThreadSafe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.*;
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ *
+ */
+@ThreadSafe
+public class DatumStatusCounter implements DatumStatusCounterMXBean{
+
+ public static final String NAME_TEMPLATE = "org.apache.streams.local:type=DatumCounter,name=%s";
+ private static final Logger LOGGER = LoggerFactory.getLogger(DatumStatusCounter.class);
+
+ private AtomicLong failed;
+ private AtomicLong passed;
+
+ public DatumStatusCounter(String id) {
+ this.failed = new AtomicLong(0);
+ this.passed = new AtomicLong(0);
+ try {
+ ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id));
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ mbs.registerMBean(this, name);
+ } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) {
+ LOGGER.error("Failed to register MXBean : {}", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void incrementFailedCount() {
+ this.incrementFailedCount(1);
+ }
+
+ public void incrementFailedCount(long delta) {
+ this.failed.addAndGet(delta);
+ }
+
+ public void incrementPassedCount() {
+ this.incrementPassedCount(1);
+ }
+
+ public void incrementPassedCount(long delta) {
+ this.passed.addAndGet(delta);
+ }
+
+
+ @Override
+ public double getFailRate() {
+ double failed = this.failed.get();
+ if(failed == 0.0 && this.passed.get() == 0) {
+ return 0.0;
+ }
+ return failed / (this.passed.get() + failed);
+ }
+
+ @Override
+ public long getNumFailed() {
+ return this.failed.get();
+ }
+
+ @Override
+ public long getNumPassed() {
+ return this.passed.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c030d439/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounterMXBean.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounterMXBean.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounterMXBean.java
new file mode 100644
index 0000000..7cc8df4
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounterMXBean.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.local.counters;
+
+/**
+ *
+ */
+public interface DatumStatusCounterMXBean {
+
+ /**
+ * Get number of failed datums
+ * @return number of failed datums
+ */
+ public long getNumFailed();
+
+ /**
+ * Get number of passed datums
+ * @return number of passed datums
+ */
+ public long getNumPassed();
+
+ /**
+ * Get the failure rate. Calculated by num failed divided by (num passed + num failed)
+ * @return the failure rate
+ */
+ public double getFailRate();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c030d439/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
new file mode 100644
index 0000000..ffd9f25
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.local.counters;
+
+import net.jcip.annotations.ThreadSafe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.*;
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ *
+ */
+@ThreadSafe
+public class StreamsTaskCounter implements StreamsTaskCounterMXBean{
+
+ public static final String NAME_TEMPLATE = "org.apache.streams.local:type=DatumCounter,name=%s";
+ private static final Logger LOGGER = LoggerFactory.getLogger(StreamsTaskCounter.class);
+
+ private AtomicLong emitted;
+ private AtomicLong received;
+ private AtomicLong errors;
+
+ public StreamsTaskCounter(String id) {
+ this.emitted = new AtomicLong(0);
+ this.received = new AtomicLong(0);
+ this.errors = new AtomicLong(0);
+ try {
+ ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id));
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ mbs.registerMBean(this, name);
+ } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) {
+ LOGGER.error("Failed to register MXBean : {}", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void incrementEmittedCount() {
+ this.incrementEmittedCount(1);
+ }
+
+ public void incrementEmittedCount(long delta) {
+ this.emitted.addAndGet(delta);
+ }
+
+ public void incrementErrorCount() {
+ this.incrementErrorCount(1);
+ }
+
+ public void incrementErrorCount(long delta) {
+ this.errors.addAndGet(delta);
+ }
+
+ public void incrementReceivedCount() {
+ this.incrementReceivedCount(1);
+ }
+
+ public void incrementReceivedCount(long delta) {
+ this.received.addAndGet(delta);
+ }
+
+ @Override
+ public double getErrorRate() {
+ if(this.received.get() == 0) {
+ return 0.0;
+ }
+ return (double) this.errors.get() / (double) this.received.get();
+ }
+
+ @Override
+ public long getNumEmitted() {
+ return this.emitted.get();
+ }
+
+ @Override
+ public long getNumReceived() {
+ return this.received.get();
+ }
+
+ @Override
+ public long getNumUnhandledErrors() {
+ return this.errors.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c030d439/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java
new file mode 100644
index 0000000..634857d
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.local.counters;
+
+/**
+ *
+ */
+public interface StreamsTaskCounterMXBean {
+
+ /**
+ * Get the error rate of the streams process calculated by the number of errors not handled by the {@link org.apache.streams.local.tasks.StreamsTask}
+ * divided by the number of datums received.
+ * @return error rate
+ */
+ public double getErrorRate();
+
+ /**
+ * Get the number of {@link org.apache.streams.core.StreamsDatum}s emitted by the streams process
+ * @return number of emitted datums
+ */
+ public long getNumEmitted();
+
+ /**
+ * Get the number of {@link org.apache.streams.core.StreamsDatum}s received by the streams process
+ * @return number of received datums
+ */
+ public long getNumReceived();
+
+ /**
+ * Get the number of errors that the process had to catch because the executing Provider/Processor/Writer did not
+ * catch and handle the exception
+ * @return number of handled errors
+ */
+ public long getNumUnhandledErrors();
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c030d439/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
index 55a26e1..ea65ac2 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.streams.local.executors;
import org.apache.streams.local.builders.LocalStreamBuilder;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c030d439/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java
new file mode 100644
index 0000000..3a9a8dc
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.local.counters;
+
+import com.carrotsearch.randomizedtesting.RandomizedTest;
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
+import org.junit.After;
+import org.junit.Test;
+
+import javax.management.InstanceNotFoundException;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+
+/**
+ *
+ */
+public class DatumStatusCounterTest extends RandomizedTest {
+
+ private static final String MBEAN_ID = "test_id";
+
+
+
+ /**
+ * Remove registered mbeans from previous tests
+ * @throws Exception
+ */
+ @After
+ public void unregisterMXBean() throws Exception {
+ try {
+ ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(DatumStatusCounter.NAME_TEMPLATE, MBEAN_ID)));
+ } catch (InstanceNotFoundException ife) {
+ //No-op
+ }
+ }
+
+ /**
+ * Test Constructor can register the counter as an mxbean with throwing an exception.
+ */
+ @Test
+ public void testConstructor() {
+ try {
+ new DatumStatusCounter(MBEAN_ID);
+ } catch (Throwable t) {
+ fail("Constructor Threw Exception : "+t.getMessage());
+ }
+ }
+
+ /**
+ * Test that you can increment passes and it returns the correct count
+ * @throws Exception
+ */
+ @Test
+ @Repeat(iterations = 3)
+ public void testPassed() throws Exception {
+ DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID);
+ int numIncrements = randomIntBetween(1, 100000);
+ for(int i=0; i < numIncrements; ++i) {
+ counter.incrementPassedCount();
+ }
+ assertEquals(numIncrements, counter.getNumPassed());
+
+ unregisterMXBean();
+
+ counter = new DatumStatusCounter(MBEAN_ID);
+ numIncrements = randomIntBetween(1, 100000);
+ long total = 0;
+ for(int i=0; i < numIncrements; ++i) {
+ long delta = randomIntBetween(1, 100);
+ total += delta;
+ counter.incrementPassedCount(delta);
+ }
+ assertEquals(total, counter.getNumPassed());
+ }
+
+ /**
+ * Test that you can increment failed and it returns the correct count
+ * @throws Exception
+ */
+ @Test
+ @Repeat(iterations = 3)
+ public void testFailed() throws Exception {
+ DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID);
+ int numIncrements = randomIntBetween(1, 100000);
+ for(int i=0; i < numIncrements; ++i) {
+ counter.incrementFailedCount();
+ }
+ assertEquals(numIncrements, counter.getNumFailed());
+
+ unregisterMXBean();
+
+ counter = new DatumStatusCounter(MBEAN_ID);
+ numIncrements = randomIntBetween(1, 100000);
+ long total = 0;
+ for(int i=0; i < numIncrements; ++i) {
+ long delta = randomIntBetween(1, 100);
+ total += delta;
+ counter.incrementFailedCount(delta);
+ }
+ assertEquals(total, counter.getNumFailed());
+ }
+
+
+ /**
+ * Test failure rate returns expected values
+ */
+ @Test
+ @Repeat(iterations = 3)
+ public void testFailureRate() {
+ DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID);
+ assertEquals(0.0, counter.getFailRate(), 0);
+ int failures = randomIntBetween(0, 100000);
+ int passes = randomIntBetween(0, 100000);
+ counter.incrementPassedCount(passes);
+ counter.incrementFailedCount(failures);
+ assertEquals((double)failures / (double)(passes + failures), counter.getFailRate(), 0);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c030d439/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java
new file mode 100644
index 0000000..a001845
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.local.counters;
+
+import com.carrotsearch.randomizedtesting.RandomizedTest;
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
+import org.junit.After;
+import org.junit.Test;
+
+import javax.management.InstanceNotFoundException;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+
+/**
+ * Unit tests for {@link org.apache.streams.local.counters.StreamsTaskCounter}
+ */
+public class StreamsTaskCounterTest extends RandomizedTest {
+
+ private static final String MBEAN_ID = "test_id";
+
+ /**
+ * Remove registered mbeans from previous tests
+ * @throws Exception
+ */
+ @After
+ public void unregisterMXBean() throws Exception {
+ try {
+ ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(StreamsTaskCounter.NAME_TEMPLATE, MBEAN_ID)));
+ } catch (InstanceNotFoundException ife) {
+ //No-op
+ }
+ }
+
+ /**
+ * Test constructor does not throw errors
+ */
+ @Test
+ public void testConstructor() {
+ try {
+ new StreamsTaskCounter(MBEAN_ID);
+ } catch (Throwable t) {
+ fail("Constructor threw error : "+t.getMessage());
+ }
+ }
+
+ /**
+ * Test emitted increments correctly and returns expected value
+ * @throws Exception
+ */
+ @Test
+ @Repeat(iterations = 3)
+ public void testEmitted() throws Exception {
+ StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID);
+ int numIncrements = randomIntBetween(1, 100000);
+ for(int i=0; i < numIncrements; ++i) {
+ counter.incrementEmittedCount();
+ }
+ assertEquals(numIncrements, counter.getNumEmitted());
+
+ unregisterMXBean();
+
+ counter = new StreamsTaskCounter(MBEAN_ID);
+ numIncrements = randomIntBetween(1, 100000);
+ long total = 0;
+ for(int i=0; i < numIncrements; ++i) {
+ long delta = randomIntBetween(1, 100);
+ total += delta;
+ counter.incrementEmittedCount(delta);
+ }
+ assertEquals(total, counter.getNumEmitted());
+ }
+
+ /**
+ * Test received increments correctly and returns expected value
+ * @throws Exception
+ */
+ @Test
+ @Repeat(iterations = 3)
+ public void testReceived() throws Exception {
+ StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID);
+ int numIncrements = randomIntBetween(1, 100000);
+ for(int i=0; i < numIncrements; ++i) {
+ counter.incrementReceivedCount();
+ }
+ assertEquals(numIncrements, counter.getNumReceived());
+
+ unregisterMXBean();
+
+ counter = new StreamsTaskCounter(MBEAN_ID);
+ numIncrements = randomIntBetween(1, 100000);
+ long total = 0;
+ for(int i=0; i < numIncrements; ++i) {
+ long delta = randomIntBetween(1, 100);
+ total += delta;
+ counter.incrementReceivedCount(delta);
+ }
+ assertEquals(total, counter.getNumReceived());
+ }
+
+ /**
+ * Test errors increments correctly and returns expected value
+ * @throws Exception
+ */
+ @Test
+ @Repeat(iterations = 3)
+ public void testError() throws Exception {
+ StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID);
+ int numIncrements = randomIntBetween(1, 100000);
+ for(int i=0; i < numIncrements; ++i) {
+ counter.incrementErrorCount();
+ }
+ assertEquals(numIncrements, counter.getNumUnhandledErrors());
+
+ unregisterMXBean();
+
+ counter = new StreamsTaskCounter(MBEAN_ID);
+ numIncrements = randomIntBetween(1, 100000);
+ long total = 0;
+ for(int i=0; i < numIncrements; ++i) {
+ long delta = randomIntBetween(1, 100);
+ total += delta;
+ counter.incrementErrorCount(delta);
+ }
+ assertEquals(total, counter.getNumUnhandledErrors());
+ }
+
+ /**
+ * Test error rate returns expected value
+ * @throws Exception
+ */
+ @Test
+ @Repeat(iterations = 3)
+ public void testErrorRate() throws Exception {
+ StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID);
+ assertEquals(0.0, counter.getErrorRate(), 0);
+ int failures = randomIntBetween(0, 100000);
+ int received = randomIntBetween(0, 100000);
+ counter.incrementReceivedCount(received);
+ counter.incrementErrorCount(failures);
+ assertEquals((double)failures / (double)(received), counter.getErrorRate(), 0);
+ }
+
+}
[06/10] git commit: STREAMS-197 | Avg time now will count for
providers and does not include errors in calculations
Posted by re...@apache.org.
STREAMS-197 | Avg time now will count for providers and does not include errors in calculations
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/f40ce9b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/f40ce9b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/f40ce9b0
Branch: refs/heads/master
Commit: f40ce9b0db7bc082ce98722529f69881d85f3c8f
Parents: bfffc5f
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Mon Oct 20 16:00:26 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Mon Oct 20 16:00:26 2014 -0500
----------------------------------------------------------------------
.../apache/streams/local/counters/StreamsTaskCounter.java | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f40ce9b0/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
index c93cae9..8801df2 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
@@ -145,10 +145,15 @@ public class StreamsTaskCounter implements StreamsTaskCounterMXBean{
@Override
public double getAvgTime() {
- if(this.received.get() == 0) {
+ long rec = this.received.get();
+ long emit = this.emitted.get();
+ if(rec == 0 && emit == 0 ) {
return 0.0;
+ } else if( rec == 0) { //provider instance
+ return this.totalTime.get() / (double) emit;
+ } else {
+ return this.totalTime.get() / ((double) this.received.get() - this.errors.get());
}
- return this.totalTime.get() / (double) this.received.get();
}
@Override
[10/10] git commit: STREAMS-197 | Added imports left out merge
Posted by re...@apache.org.
STREAMS-197 | Added imports left out merge
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9b322714
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9b322714
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9b322714
Branch: refs/heads/master
Commit: 9b32271419abdefda0bfd9c19f3614b223d7ca63
Parents: f958890
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Tue Oct 21 18:18:45 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Tue Oct 21 18:18:45 2014 -0500
----------------------------------------------------------------------
.../src/main/java/org/apache/streams/util/ComponentUtils.java | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9b322714/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
index 169fe91..d62182f 100644
--- a/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
+++ b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
@@ -22,8 +22,7 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
+import javax.management.*;
import java.lang.management.ManagementFactory;
import java.util.Queue;
import java.util.Set;
[05/10] git commit: Changed NAME_TEMPLATE
Posted by re...@apache.org.
Changed NAME_TEMPLATE
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/bfffc5f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/bfffc5f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/bfffc5f7
Branch: refs/heads/master
Commit: bfffc5f7261b9117dd33bab7259e95e6de1f5e26
Parents: d305371
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Mon Oct 20 15:32:13 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Mon Oct 20 15:32:13 2014 -0500
----------------------------------------------------------------------
.../java/org/apache/streams/local/counters/StreamsTaskCounter.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/bfffc5f7/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
index a0eb349..c93cae9 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
@@ -32,7 +32,7 @@ import java.util.concurrent.atomic.AtomicLong;
@ThreadSafe
public class StreamsTaskCounter implements StreamsTaskCounterMXBean{
- public static final String NAME_TEMPLATE = "org.apache.streams.local:type=DatumCounter,name=%s";
+ public static final String NAME_TEMPLATE = "org.apache.streams.local:type=StreamsTaskCounter,name=%s";
private static final Logger LOGGER = LoggerFactory.getLogger(StreamsTaskCounter.class);
private AtomicLong emitted;
[04/10] git commit: Merge branch 'master' into datumcountable
Posted by re...@apache.org.
Merge branch 'master' into datumcountable
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d3053713
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d3053713
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d3053713
Branch: refs/heads/master
Commit: d305371364b2340f26a66618aa614ca6f3ce6ee4
Parents: 393c2e7 6ecf46d
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Mon Oct 20 12:53:31 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Mon Oct 20 12:53:31 2014 -0500
----------------------------------------------------------------------
.../local/builders/LocalStreamBuilder.java | 58 +--
.../streams/local/builders/StreamComponent.java | 21 +-
.../streams/local/tasks/BaseStreamsTask.java | 52 ++-
.../streams/local/tasks/StreamsMergeTask.java | 9 +-
.../local/tasks/StreamsPersistWriterTask.java | 43 +-
.../local/tasks/StreamsProcessorTask.java | 52 ++-
.../local/tasks/StreamsProviderTask.java | 8 +-
.../apache/streams/local/tasks/StreamsTask.java | 14 +-
.../local/builders/LocalStreamBuilderTest.java | 399 ++++++++++++-------
.../local/builders/ToyLocalBuilderExample.java | 5 +-
.../streams/local/tasks/BasicTasksTest.java | 52 +--
.../local/tasks/StreamsProviderTaskTest.java | 7 +-
.../PassthroughDatumCounterProcessor.java | 51 ++-
.../test/providers/NumericMessageProvider.java | 29 +-
.../local/test/writer/DatumCounterWriter.java | 61 ++-
15 files changed, 569 insertions(+), 292 deletions(-)
----------------------------------------------------------------------
[08/10] git commit: Merge pull request #10 from apache/master
Posted by re...@apache.org.
Merge pull request #10 from apache/master
Merge Apache Master 2014/10/21
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/2c715505
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/2c715505
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/2c715505
Branch: refs/heads/master
Commit: 2c71550570dfeb20fe62a44b20e53b2a03168b73
Parents: 6a8ed61 849a2e8
Author: Ryan Ebanks <rb...@users.noreply.github.com>
Authored: Tue Oct 21 17:57:32 2014 -0500
Committer: Ryan Ebanks <rb...@users.noreply.github.com>
Committed: Tue Oct 21 17:57:32 2014 -0500
----------------------------------------------------------------------
.../streams/local/tasks/StreamsProcessorTask.java | 2 +-
.../local/builders/LocalStreamBuilderTest.java | 12 ++++++++++++
...nUnhandledThrowableThreadPoolExecutorTest.java | 11 +++++++++++
.../queues/ThroughputQueueMulitThreadTest.java | 10 ++++++++++
.../queues/ThroughputQueueSingleThreadTest.java | 11 +++++++++++
.../streams/local/tasks/BasicTasksTest.java | 10 ++++++++++
.../local/tasks/StreamsProviderTaskTest.java | 10 ++++++++++
.../tests/TestComponentsLocalStream.java | 13 ++++++++++++-
.../tests/TestExpectedDatumsPersitWriter.java | 13 ++++++++++++-
.../component/tests/TestFileReaderProvider.java | 12 +++++++++++-
.../org/apache/streams/util/ComponentUtils.java | 18 ++++++++++++++++++
11 files changed, 118 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
[09/10] git commit: STREAMS-197 | Merged apache master and handle
merge conflict
Posted by re...@apache.org.
STREAMS-197 | Merged apache master and handle merge conflict
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/f9588905
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/f9588905
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/f9588905
Branch: refs/heads/master
Commit: f9588905deeaa4bed0e3bad0945c626202e4c211
Parents: 0006a04 2c71550
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Tue Oct 21 18:00:34 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Tue Oct 21 18:00:34 2014 -0500
----------------------------------------------------------------------
.../streams-persist-elasticsearch/pom.xml | 47 +++++++
.../ElasticsearchMetadataUtil.java | 106 ++++++++++++++++
.../ElasticsearchPersistDeleter.java | 6 +-
.../ElasticsearchPersistUpdater.java | 6 +-
.../ElasticsearchPersistWriter.java | 47 +------
.../DatumFromMetadataAsDocumentProcessor.java | 122 +++++++++++++++++++
.../processor/DatumFromMetadataProcessor.java | 103 ++++++++++++++++
.../processor/DocumentToMetadataProcessor.java | 100 +++++++++++++++
.../ElasticsearchConfiguration.json | 3 +-
.../test/TestDatumFromMetadataProcessor.java | 81 ++++++++++++
.../test/TestDocumentToMetadataProcessor.java | 63 ++++++++++
.../test/TestElasticsearchPersistWriter.java | 70 +++++++++++
.../provider/TwitterTimelineProvider.java | 14 ++-
.../provider/TwitterTimelineProviderTest.java | 39 ++++++
.../local/tasks/StreamsProcessorTask.java | 2 +-
.../local/builders/LocalStreamBuilderTest.java | 12 ++
...nhandledThrowableThreadPoolExecutorTest.java | 11 ++
.../queues/ThroughputQueueMulitThreadTest.java | 10 ++
.../queues/ThroughputQueueSingleThreadTest.java | 11 ++
.../streams/local/tasks/BasicTasksTest.java | 10 ++
.../local/tasks/StreamsProviderTaskTest.java | 10 ++
.../tests/TestComponentsLocalStream.java | 13 +-
.../tests/TestExpectedDatumsPersitWriter.java | 13 +-
.../component/tests/TestFileReaderProvider.java | 12 +-
.../org/apache/streams/util/ComponentUtils.java | 18 ++-
25 files changed, 868 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f9588905/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
----------------------------------------------------------------------
diff --cc streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
index bb65c4c,9f73560..169fe91
--- a/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
+++ b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
@@@ -108,19 -110,17 +110,33 @@@ public class ComponentUtils
}
/**
+ * Removes all mbeans registered undered a specific domain. Made specificly to clean up at unit tests
+ * @param domain
+ */
+ public static void removeAllMBeansOfDomain(String domain) throws Exception {
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ domain = domain.endsWith(":") ? domain : domain+":";
+ ObjectName objectName = new ObjectName(domain+"*");
+ Set<ObjectName> mbeanNames = mbs.queryNames(objectName, null);
+ for(ObjectName name : mbeanNames) {
+ mbs.unregisterMBean(name);
+ }
+ }
+
++ /**
+ * Attempts to register an object with local MBeanServer. Throws runtime exception on errors.
+ * @param name name to register bean with
+ * @param mbean mbean to register
+ */
+ public static <V> void registerLocalMBean(String name, V mbean) {
+ try {
+ ObjectName objectName = new ObjectName(name);
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ mbs.registerMBean(mbean, objectName);
+ } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) {
+ LOGGER.error("Failed to register MXBean : {}", e);
+ throw new RuntimeException(e);
+ }
+ }
+
}
[03/10] git commit: Added java doc
Posted by re...@apache.org.
Added java doc
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/393c2e79
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/393c2e79
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/393c2e79
Branch: refs/heads/master
Commit: 393c2e79b2534bb7f8ba344b6418488c84338c87
Parents: 6916a1b
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Fri Oct 17 18:03:04 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Fri Oct 17 18:03:04 2014 -0500
----------------------------------------------------------------------
.../org/apache/streams/local/counters/StreamsTaskCounter.java | 4 ++++
1 file changed, 4 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/393c2e79/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
index e864219..a0eb349 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
@@ -42,6 +42,10 @@ public class StreamsTaskCounter implements StreamsTaskCounterMXBean{
@GuardedBy("this")
private volatile long maxTime;
+ /**
+ *
+ * @param id
+ */
public StreamsTaskCounter(String id) {
this.emitted = new AtomicLong(0);
this.received = new AtomicLong(0);
[07/10] git commit: STREAMS-197 | add util function and refactored
Posted by re...@apache.org.
STREAMS-197 | add util function and refactored
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/0006a044
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/0006a044
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/0006a044
Branch: refs/heads/master
Commit: 0006a044f001a68ba8ceb481ff4fbcb0ba0de270
Parents: f40ce9b
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Tue Oct 21 17:26:35 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Tue Oct 21 17:26:35 2014 -0500
----------------------------------------------------------------------
streams-runtimes/streams-runtime-local/pom.xml | 6 +++---
.../local/counters/DatumStatusCounter.java | 15 +++++----------
.../local/counters/StreamsTaskCounter.java | 10 ++--------
.../org/apache/streams/util/ComponentUtils.java | 18 ++++++++++++++++++
4 files changed, 28 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0006a044/streams-runtimes/streams-runtime-local/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/pom.xml b/streams-runtimes/streams-runtime-local/pom.xml
index 0d9138d..9ee8d5b 100644
--- a/streams-runtimes/streams-runtime-local/pom.xml
+++ b/streams-runtimes/streams-runtime-local/pom.xml
@@ -46,17 +46,17 @@
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-core</artifactId>
- <version>0.1-SNAPSHOT</version>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-util</artifactId>
- <version>0.1-SNAPSHOT</version>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-pojo</artifactId>
- <version>0.1-SNAPSHOT</version>
+ <version>${project.version}</version>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0006a044/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java
index 88c3a6f..acada71 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java
@@ -18,6 +18,7 @@
package org.apache.streams.local.counters;
import net.jcip.annotations.ThreadSafe;
+import org.apache.streams.util.ComponentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,14 +41,7 @@ public class DatumStatusCounter implements DatumStatusCounterMXBean{
public DatumStatusCounter(String id) {
this.failed = new AtomicLong(0);
this.passed = new AtomicLong(0);
- try {
- ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id));
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- mbs.registerMBean(this, name);
- } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) {
- LOGGER.error("Failed to register MXBean : {}", e);
- throw new RuntimeException(e);
- }
+ ComponentUtils.registerLocalMBean(String.format(NAME_TEMPLATE, id), this);
}
public void incrementFailedCount() {
@@ -70,10 +64,11 @@ public class DatumStatusCounter implements DatumStatusCounterMXBean{
@Override
public double getFailRate() {
double failed = this.failed.get();
- if(failed == 0.0 && this.passed.get() == 0) {
+ double passed = this.passed.get();
+ if(failed == 0.0 && passed == 0) {
return 0.0;
}
- return failed / (this.passed.get() + failed);
+ return failed / (passed + failed);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0006a044/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
index 8801df2..68c6364 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
@@ -19,6 +19,7 @@ package org.apache.streams.local.counters;
import net.jcip.annotations.GuardedBy;
import net.jcip.annotations.ThreadSafe;
+import org.apache.streams.util.ComponentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,14 +53,7 @@ public class StreamsTaskCounter implements StreamsTaskCounterMXBean{
this.errors = new AtomicLong(0);
this.totalTime = new AtomicLong(0);
this.maxTime = -1;
- try {
- ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id));
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- mbs.registerMBean(this, name);
- } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) {
- LOGGER.error("Failed to register MXBean : {}", e);
- throw new RuntimeException(e);
- }
+ ComponentUtils.registerLocalMBean(String.format(NAME_TEMPLATE, id), this);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0006a044/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
index 9f3c480..bb65c4c 100644
--- a/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
+++ b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
@@ -22,6 +22,8 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.management.*;
+import java.lang.management.ManagementFactory;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -105,4 +107,20 @@ public class ComponentUtils {
}
}
+ /**
+ * Attempts to register an object with local MBeanServer. Throws runtime exception on errors.
+ * @param name name to register bean with
+ * @param mbean mbean to register
+ */
+ public static <V> void registerLocalMBean(String name, V mbean) {
+ try {
+ ObjectName objectName = new ObjectName(name);
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ mbs.registerMBean(mbean, objectName);
+ } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) {
+ LOGGER.error("Failed to register MXBean : {}", e);
+ throw new RuntimeException(e);
+ }
+ }
+
}
[02/10] git commit: Added processing time
Posted by re...@apache.org.
Added processing time
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/6916a1b7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/6916a1b7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/6916a1b7
Branch: refs/heads/master
Commit: 6916a1b78318f7dec9d73121e25e1f6b545c9a78
Parents: c030d43
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Fri Oct 17 17:53:34 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Fri Oct 17 17:53:34 2014 -0500
----------------------------------------------------------------------
.../local/counters/StreamsTaskCounter.java | 53 ++++++++++++++++++++
.../counters/StreamsTaskCounterMXBean.java | 10 ++++
2 files changed, 63 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6916a1b7/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
index ffd9f25..e864219 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
@@ -17,6 +17,7 @@
*/
package org.apache.streams.local.counters;
+import net.jcip.annotations.GuardedBy;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,11 +38,16 @@ public class StreamsTaskCounter implements StreamsTaskCounterMXBean{
private AtomicLong emitted;
private AtomicLong received;
private AtomicLong errors;
+ private AtomicLong totalTime;
+ @GuardedBy("this")
+ private volatile long maxTime;
public StreamsTaskCounter(String id) {
this.emitted = new AtomicLong(0);
this.received = new AtomicLong(0);
this.errors = new AtomicLong(0);
+ this.totalTime = new AtomicLong(0);
+ this.maxTime = -1;
try {
ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id));
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
@@ -52,30 +58,64 @@ public class StreamsTaskCounter implements StreamsTaskCounterMXBean{
}
}
+ /**
+ * Increment emitted count
+ */
public void incrementEmittedCount() {
this.incrementEmittedCount(1);
}
+ /**
+ * Increment emitted count
+ * @param delta
+ */
public void incrementEmittedCount(long delta) {
this.emitted.addAndGet(delta);
}
+ /**
+ * Increment error count
+ */
public void incrementErrorCount() {
this.incrementErrorCount(1);
}
+ /**
+ * Increment error count
+ * @param delta
+ */
public void incrementErrorCount(long delta) {
this.errors.addAndGet(delta);
}
+ /**
+ * Increment received count
+ */
public void incrementReceivedCount() {
this.incrementReceivedCount(1);
}
+ /**
+ * Increment received count
+ * @param delta
+ */
public void incrementReceivedCount(long delta) {
this.received.addAndGet(delta);
}
+ /**
+ * Add the time it takes to process a single datum in milliseconds
+ * @param processTime
+ */
+ public void addTime(long processTime) {
+ synchronized (this) {
+ if(processTime > this.maxTime) {
+ this.maxTime = processTime;
+ }
+ }
+ this.totalTime.addAndGet(processTime);
+ }
+
@Override
public double getErrorRate() {
if(this.received.get() == 0) {
@@ -98,4 +138,17 @@ public class StreamsTaskCounter implements StreamsTaskCounterMXBean{
public long getNumUnhandledErrors() {
return this.errors.get();
}
+
+ @Override
+ public double getAvgTime() {
+ if(this.received.get() == 0) {
+ return 0.0;
+ }
+ return this.totalTime.get() / (double) this.received.get();
+ }
+
+ @Override
+ public long getMaxTime() {
+ return this.maxTime;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6916a1b7/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java
index 634857d..8ac2e33 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java
@@ -48,6 +48,16 @@ public interface StreamsTaskCounterMXBean {
*/
public long getNumUnhandledErrors();
+ /**
+ * Returns the average time in milliseconds it takes the task to readCurrent, process, or write to return.
+ * @return
+ */
+ public double getAvgTime();
+ /**
+ * Returns the max time in milliseconds it takes the task to readCurrent, process, or write to return.
+ * @return
+ */
+ public long getMaxTime();
}