You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by re...@apache.org on 2014/10/22 02:03:23 UTC

[01/10] git commit: Created MXBean for counts and depricated DatumStatusCounter and DatumStatusCountable in core

Repository: incubator-streams
Updated Branches:
  refs/heads/master 849a2e8f9 -> 9b3227141


Created MXBean for counts and depricated DatumStatusCounter and DatumStatusCountable in core


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c030d439
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c030d439
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c030d439

Branch: refs/heads/master
Commit: c030d4393ab84fbd1cf484bf8ea06a7902a26964
Parents: 2a635df
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Fri Oct 17 17:13:51 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Fri Oct 17 17:13:51 2014 -0500

----------------------------------------------------------------------
 .../streams/core/DatumStatusCountable.java      |   1 +
 .../apache/streams/core/DatumStatusCounter.java |   2 +-
 .../local/counters/DatumStatusCounter.java      |  88 +++++++++++
 .../counters/DatumStatusCounterMXBean.java      |  43 +++++
 .../local/counters/StreamsTaskCounter.java      | 101 ++++++++++++
 .../counters/StreamsTaskCounterMXBean.java      |  53 +++++++
 ...amOnUnhandleThrowableThreadPoolExecutor.java |  17 ++
 .../local/counters/DatumStatusCounterTest.java  | 134 ++++++++++++++++
 .../local/counters/StreamsTaskCounterTest.java  | 158 +++++++++++++++++++
 9 files changed, 596 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c030d439/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java
index 6a9da4d..3abf30d 100644
--- a/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java
+++ b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java
@@ -21,6 +21,7 @@ package org.apache.streams.core;
 /**
  * Created by steveblackmon on 3/24/14.
  */
+@Deprecated
 public interface DatumStatusCountable {
 
     public DatumStatusCounter getDatumStatusCounter();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c030d439/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java
index 1bba688..2758ada 100644
--- a/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java
+++ b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java
@@ -17,7 +17,7 @@
  */
 
 package org.apache.streams.core;
-
+@Deprecated
 public class DatumStatusCounter
 {
     private volatile int attempted = 0;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c030d439/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java
new file mode 100644
index 0000000..88c3a6f
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.local.counters;
+
+import net.jcip.annotations.ThreadSafe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.*;
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ *
+ */
+@ThreadSafe
+public class DatumStatusCounter implements DatumStatusCounterMXBean{
+
+    public static final String NAME_TEMPLATE = "org.apache.streams.local:type=DatumCounter,name=%s";
+    private static final Logger LOGGER = LoggerFactory.getLogger(DatumStatusCounter.class);
+
+    private AtomicLong failed;
+    private AtomicLong passed;
+
+    public DatumStatusCounter(String id) {
+        this.failed = new AtomicLong(0);
+        this.passed = new AtomicLong(0);
+        try {
+            ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id));
+            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+            mbs.registerMBean(this, name);
+        } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) {
+            LOGGER.error("Failed to register MXBean : {}", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void incrementFailedCount() {
+        this.incrementFailedCount(1);
+    }
+
+    public void incrementFailedCount(long delta) {
+        this.failed.addAndGet(delta);
+    }
+
+    public void incrementPassedCount() {
+        this.incrementPassedCount(1);
+    }
+
+    public void incrementPassedCount(long delta) {
+        this.passed.addAndGet(delta);
+    }
+
+
+    @Override
+    public double getFailRate() {
+        double failed = this.failed.get();
+        if(failed == 0.0 && this.passed.get() == 0) {
+            return 0.0;
+        }
+        return failed / (this.passed.get() + failed);
+    }
+
+    @Override
+    public long getNumFailed() {
+        return this.failed.get();
+    }
+
+    @Override
+    public long getNumPassed() {
+        return this.passed.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c030d439/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounterMXBean.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounterMXBean.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounterMXBean.java
new file mode 100644
index 0000000..7cc8df4
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounterMXBean.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.local.counters;
+
+/**
+ *
+ */
+public interface DatumStatusCounterMXBean {
+
+    /**
+     * Get number of failed datums
+     * @return number of failed datums
+     */
+    public long getNumFailed();
+
+    /**
+     * Get number of passed datums
+     * @return number of passed datums
+     */
+    public long getNumPassed();
+
+    /**
+     * Get the failure rate.  Calculated by num failed divided by (num passed + num failed)
+     * @return the failure rate
+     */
+    public double getFailRate();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c030d439/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
new file mode 100644
index 0000000..ffd9f25
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.local.counters;
+
+import net.jcip.annotations.ThreadSafe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.*;
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ *
+ */
+@ThreadSafe
+public class StreamsTaskCounter implements StreamsTaskCounterMXBean{
+
+    public static final String NAME_TEMPLATE = "org.apache.streams.local:type=DatumCounter,name=%s";
+    private static final Logger LOGGER = LoggerFactory.getLogger(StreamsTaskCounter.class);
+
+    private AtomicLong emitted;
+    private AtomicLong received;
+    private AtomicLong errors;
+
+    public StreamsTaskCounter(String id) {
+        this.emitted = new AtomicLong(0);
+        this.received = new AtomicLong(0);
+        this.errors = new AtomicLong(0);
+        try {
+            ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id));
+            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+            mbs.registerMBean(this, name);
+        } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) {
+            LOGGER.error("Failed to register MXBean : {}", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void incrementEmittedCount() {
+        this.incrementEmittedCount(1);
+    }
+
+    public void incrementEmittedCount(long delta) {
+        this.emitted.addAndGet(delta);
+    }
+
+    public void incrementErrorCount() {
+        this.incrementErrorCount(1);
+    }
+
+    public void incrementErrorCount(long delta) {
+        this.errors.addAndGet(delta);
+    }
+
+    public void incrementReceivedCount() {
+        this.incrementReceivedCount(1);
+    }
+
+    public void incrementReceivedCount(long delta) {
+        this.received.addAndGet(delta);
+    }
+
+    @Override
+    public double getErrorRate() {
+        if(this.received.get() == 0) {
+            return 0.0;
+        }
+        return (double) this.errors.get() / (double) this.received.get();
+    }
+
+    @Override
+    public long getNumEmitted() {
+        return this.emitted.get();
+    }
+
+    @Override
+    public long getNumReceived() {
+        return this.received.get();
+    }
+
+    @Override
+    public long getNumUnhandledErrors() {
+        return this.errors.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c030d439/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java
new file mode 100644
index 0000000..634857d
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.local.counters;
+
+/**
+ *
+ */
+public interface StreamsTaskCounterMXBean {
+
+    /**
+     * Get the error rate of the streams process calculated by the number of errors not handled by the {@link org.apache.streams.local.tasks.StreamsTask}
+     * divided by the number of datums received.
+     * @return error rate
+     */
+    public double getErrorRate();
+
+    /**
+     * Get the number of {@link org.apache.streams.core.StreamsDatum}s emitted by the streams process
+     * @return number of emitted datums
+     */
+    public long getNumEmitted();
+
+    /**
+     * Get the number of {@link org.apache.streams.core.StreamsDatum}s received by the streams process
+     * @return number of received datums
+     */
+    public long getNumReceived();
+
+    /**
+     * Get the number of errors that the process had to catch because the executing Provider/Processor/Writer did not
+     * catch and handle the exception
+     * @return number of handled errors
+     */
+    public long getNumUnhandledErrors();
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c030d439/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
index 55a26e1..ea65ac2 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.streams.local.executors;
 
 import org.apache.streams.local.builders.LocalStreamBuilder;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c030d439/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java
new file mode 100644
index 0000000..3a9a8dc
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.local.counters;
+
+import com.carrotsearch.randomizedtesting.RandomizedTest;
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
+import org.junit.After;
+import org.junit.Test;
+
+import javax.management.InstanceNotFoundException;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+
+/**
+ *
+ */
+public class DatumStatusCounterTest extends RandomizedTest {
+
+    private static final String MBEAN_ID = "test_id";
+
+
+
+    /**
+     * Remove registered mbeans from previous tests
+     * @throws Exception
+     */
+    @After
+    public void unregisterMXBean() throws Exception {
+        try {
+            ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(DatumStatusCounter.NAME_TEMPLATE, MBEAN_ID)));
+        } catch (InstanceNotFoundException ife) {
+            //No-op
+        }
+    }
+
+    /**
+     * Test Constructor can register the counter as an mxbean with throwing an exception.
+     */
+    @Test
+    public void testConstructor() {
+        try {
+            new DatumStatusCounter(MBEAN_ID);
+        } catch (Throwable t) {
+            fail("Constructor Threw Exception : "+t.getMessage());
+        }
+    }
+
+    /**
+     * Test that you can increment passes and it returns the correct count
+     * @throws Exception
+     */
+    @Test
+    @Repeat(iterations = 3)
+    public void testPassed() throws Exception {
+        DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID);
+        int numIncrements = randomIntBetween(1, 100000);
+        for(int i=0; i < numIncrements; ++i) {
+            counter.incrementPassedCount();
+        }
+        assertEquals(numIncrements, counter.getNumPassed());
+
+        unregisterMXBean();
+
+        counter = new DatumStatusCounter(MBEAN_ID);
+        numIncrements = randomIntBetween(1, 100000);
+        long total = 0;
+        for(int i=0; i < numIncrements; ++i) {
+            long delta = randomIntBetween(1, 100);
+            total += delta;
+            counter.incrementPassedCount(delta);
+        }
+        assertEquals(total, counter.getNumPassed());
+    }
+
+    /**
+     * Test that you can increment failed and it returns the correct count
+     * @throws Exception
+     */
+    @Test
+    @Repeat(iterations = 3)
+    public void testFailed() throws Exception {
+        DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID);
+        int numIncrements = randomIntBetween(1, 100000);
+        for(int i=0; i < numIncrements; ++i) {
+            counter.incrementFailedCount();
+        }
+        assertEquals(numIncrements, counter.getNumFailed());
+
+        unregisterMXBean();
+
+        counter = new DatumStatusCounter(MBEAN_ID);
+        numIncrements = randomIntBetween(1, 100000);
+        long total = 0;
+        for(int i=0; i < numIncrements; ++i) {
+            long delta = randomIntBetween(1, 100);
+            total += delta;
+            counter.incrementFailedCount(delta);
+        }
+        assertEquals(total, counter.getNumFailed());
+    }
+
+
+    /**
+     * Test failure rate returns expected values
+     */
+    @Test
+    @Repeat(iterations = 3)
+    public void testFailureRate() {
+        DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID);
+        assertEquals(0.0, counter.getFailRate(), 0);
+        int failures = randomIntBetween(0, 100000);
+        int passes = randomIntBetween(0, 100000);
+        counter.incrementPassedCount(passes);
+        counter.incrementFailedCount(failures);
+        assertEquals((double)failures / (double)(passes + failures), counter.getFailRate(), 0);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c030d439/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java
new file mode 100644
index 0000000..a001845
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.local.counters;
+
+import com.carrotsearch.randomizedtesting.RandomizedTest;
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
+import org.junit.After;
+import org.junit.Test;
+
+import javax.management.InstanceNotFoundException;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+
+/**
+ * Unit tests for {@link org.apache.streams.local.counters.StreamsTaskCounter}
+ */
+public class StreamsTaskCounterTest extends RandomizedTest {
+
+    private static final String MBEAN_ID = "test_id";
+
+    /**
+     * Remove registered mbeans from previous tests
+     * @throws Exception
+     */
+    @After
+    public void unregisterMXBean() throws Exception {
+        try {
+            ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(StreamsTaskCounter.NAME_TEMPLATE, MBEAN_ID)));
+        } catch (InstanceNotFoundException ife) {
+            //No-op
+        }
+    }
+
+    /**
+     * Test constructor does not throw errors
+     */
+    @Test
+    public void testConstructor() {
+        try {
+            new StreamsTaskCounter(MBEAN_ID);
+        } catch (Throwable t) {
+            fail("Constructor threw error : "+t.getMessage());
+        }
+    }
+
+    /**
+     * Test emitted increments correctly and returns expected value
+     * @throws Exception
+     */
+    @Test
+    @Repeat(iterations = 3)
+    public void testEmitted() throws Exception {
+        StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID);
+        int numIncrements = randomIntBetween(1, 100000);
+        for(int i=0; i < numIncrements; ++i) {
+            counter.incrementEmittedCount();
+        }
+        assertEquals(numIncrements, counter.getNumEmitted());
+
+        unregisterMXBean();
+
+        counter = new StreamsTaskCounter(MBEAN_ID);
+        numIncrements = randomIntBetween(1, 100000);
+        long total = 0;
+        for(int i=0; i < numIncrements; ++i) {
+            long delta = randomIntBetween(1, 100);
+            total += delta;
+            counter.incrementEmittedCount(delta);
+        }
+        assertEquals(total, counter.getNumEmitted());
+    }
+
+    /**
+     * Test received increments correctly and returns expected value
+     * @throws Exception
+     */
+    @Test
+    @Repeat(iterations = 3)
+    public void testReceived() throws Exception {
+        StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID);
+        int numIncrements = randomIntBetween(1, 100000);
+        for(int i=0; i < numIncrements; ++i) {
+            counter.incrementReceivedCount();
+        }
+        assertEquals(numIncrements, counter.getNumReceived());
+
+        unregisterMXBean();
+
+        counter = new StreamsTaskCounter(MBEAN_ID);
+        numIncrements = randomIntBetween(1, 100000);
+        long total = 0;
+        for(int i=0; i < numIncrements; ++i) {
+            long delta = randomIntBetween(1, 100);
+            total += delta;
+            counter.incrementReceivedCount(delta);
+        }
+        assertEquals(total, counter.getNumReceived());
+    }
+
+    /**
+     * Test errors increments correctly and returns expected value
+     * @throws Exception
+     */
+    @Test
+    @Repeat(iterations = 3)
+    public void testError() throws Exception {
+        StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID);
+        int numIncrements = randomIntBetween(1, 100000);
+        for(int i=0; i < numIncrements; ++i) {
+            counter.incrementErrorCount();
+        }
+        assertEquals(numIncrements, counter.getNumUnhandledErrors());
+
+        unregisterMXBean();
+
+        counter = new StreamsTaskCounter(MBEAN_ID);
+        numIncrements = randomIntBetween(1, 100000);
+        long total = 0;
+        for(int i=0; i < numIncrements; ++i) {
+            long delta = randomIntBetween(1, 100);
+            total += delta;
+            counter.incrementErrorCount(delta);
+        }
+        assertEquals(total, counter.getNumUnhandledErrors());
+    }
+
+    /**
+     * Test error rate returns expected value
+     * @throws Exception
+     */
+    @Test
+    @Repeat(iterations = 3)
+    public void testErrorRate() throws Exception {
+        StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID);
+        assertEquals(0.0, counter.getErrorRate(), 0);
+        int failures = randomIntBetween(0, 100000);
+        int received = randomIntBetween(0, 100000);
+        counter.incrementReceivedCount(received);
+        counter.incrementErrorCount(failures);
+        assertEquals((double)failures / (double)(received), counter.getErrorRate(), 0);
+    }
+
+}


[06/10] git commit: STREAMS-197 | Avg time now will count for providers and does not include errors in calculations

Posted by re...@apache.org.
STREAMS-197 | Avg time now will count for providers and does not include errors in calculations


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/f40ce9b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/f40ce9b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/f40ce9b0

Branch: refs/heads/master
Commit: f40ce9b0db7bc082ce98722529f69881d85f3c8f
Parents: bfffc5f
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Mon Oct 20 16:00:26 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Mon Oct 20 16:00:26 2014 -0500

----------------------------------------------------------------------
 .../apache/streams/local/counters/StreamsTaskCounter.java   | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f40ce9b0/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
index c93cae9..8801df2 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
@@ -145,10 +145,15 @@ public class StreamsTaskCounter implements StreamsTaskCounterMXBean{
 
     @Override
     public double getAvgTime() {
-        if(this.received.get() == 0) {
+        long rec = this.received.get();
+        long emit = this.emitted.get();
+        if(rec == 0 && emit == 0 ) {
             return 0.0;
+        } else if( rec == 0) { //provider instance
+            return this.totalTime.get() / (double) emit;
+        } else {
+            return this.totalTime.get() / ((double) this.received.get() - this.errors.get());
         }
-        return this.totalTime.get() / (double) this.received.get();
     }
 
     @Override


[10/10] git commit: STREAMS-197 | Added imports left out merge

Posted by re...@apache.org.
STREAMS-197 | Added imports left out merge


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9b322714
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9b322714
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9b322714

Branch: refs/heads/master
Commit: 9b32271419abdefda0bfd9c19f3614b223d7ca63
Parents: f958890
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Tue Oct 21 18:18:45 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Tue Oct 21 18:18:45 2014 -0500

----------------------------------------------------------------------
 .../src/main/java/org/apache/streams/util/ComponentUtils.java     | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9b322714/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
index 169fe91..d62182f 100644
--- a/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
+++ b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
@@ -22,8 +22,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
+import javax.management.*;
 import java.lang.management.ManagementFactory;
 import java.util.Queue;
 import java.util.Set;


[05/10] git commit: Changed NAME_TEMPLATE

Posted by re...@apache.org.
Changed NAME_TEMPLATE


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/bfffc5f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/bfffc5f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/bfffc5f7

Branch: refs/heads/master
Commit: bfffc5f7261b9117dd33bab7259e95e6de1f5e26
Parents: d305371
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Mon Oct 20 15:32:13 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Mon Oct 20 15:32:13 2014 -0500

----------------------------------------------------------------------
 .../java/org/apache/streams/local/counters/StreamsTaskCounter.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/bfffc5f7/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
index a0eb349..c93cae9 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
@@ -32,7 +32,7 @@ import java.util.concurrent.atomic.AtomicLong;
 @ThreadSafe
 public class StreamsTaskCounter implements StreamsTaskCounterMXBean{
 
-    public static final String NAME_TEMPLATE = "org.apache.streams.local:type=DatumCounter,name=%s";
+    public static final String NAME_TEMPLATE = "org.apache.streams.local:type=StreamsTaskCounter,name=%s";
     private static final Logger LOGGER = LoggerFactory.getLogger(StreamsTaskCounter.class);
 
     private AtomicLong emitted;


[04/10] git commit: Merge branch 'master' into datumcountable

Posted by re...@apache.org.
Merge branch 'master' into datumcountable


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d3053713
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d3053713
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d3053713

Branch: refs/heads/master
Commit: d305371364b2340f26a66618aa614ca6f3ce6ee4
Parents: 393c2e7 6ecf46d
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Mon Oct 20 12:53:31 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Mon Oct 20 12:53:31 2014 -0500

----------------------------------------------------------------------
 .../local/builders/LocalStreamBuilder.java      |  58 +--
 .../streams/local/builders/StreamComponent.java |  21 +-
 .../streams/local/tasks/BaseStreamsTask.java    |  52 ++-
 .../streams/local/tasks/StreamsMergeTask.java   |   9 +-
 .../local/tasks/StreamsPersistWriterTask.java   |  43 +-
 .../local/tasks/StreamsProcessorTask.java       |  52 ++-
 .../local/tasks/StreamsProviderTask.java        |   8 +-
 .../apache/streams/local/tasks/StreamsTask.java |  14 +-
 .../local/builders/LocalStreamBuilderTest.java  | 399 ++++++++++++-------
 .../local/builders/ToyLocalBuilderExample.java  |   5 +-
 .../streams/local/tasks/BasicTasksTest.java     |  52 +--
 .../local/tasks/StreamsProviderTaskTest.java    |   7 +-
 .../PassthroughDatumCounterProcessor.java       |  51 ++-
 .../test/providers/NumericMessageProvider.java  |  29 +-
 .../local/test/writer/DatumCounterWriter.java   |  61 ++-
 15 files changed, 569 insertions(+), 292 deletions(-)
----------------------------------------------------------------------



[08/10] git commit: Merge pull request #10 from apache/master

Posted by re...@apache.org.
Merge pull request #10 from apache/master

Merge Apache Master 2014/10/21

Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/2c715505
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/2c715505
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/2c715505

Branch: refs/heads/master
Commit: 2c71550570dfeb20fe62a44b20e53b2a03168b73
Parents: 6a8ed61 849a2e8
Author: Ryan Ebanks <rb...@users.noreply.github.com>
Authored: Tue Oct 21 17:57:32 2014 -0500
Committer: Ryan Ebanks <rb...@users.noreply.github.com>
Committed: Tue Oct 21 17:57:32 2014 -0500

----------------------------------------------------------------------
 .../streams/local/tasks/StreamsProcessorTask.java |  2 +-
 .../local/builders/LocalStreamBuilderTest.java    | 12 ++++++++++++
 ...nUnhandledThrowableThreadPoolExecutorTest.java | 11 +++++++++++
 .../queues/ThroughputQueueMulitThreadTest.java    | 10 ++++++++++
 .../queues/ThroughputQueueSingleThreadTest.java   | 11 +++++++++++
 .../streams/local/tasks/BasicTasksTest.java       | 10 ++++++++++
 .../local/tasks/StreamsProviderTaskTest.java      | 10 ++++++++++
 .../tests/TestComponentsLocalStream.java          | 13 ++++++++++++-
 .../tests/TestExpectedDatumsPersitWriter.java     | 13 ++++++++++++-
 .../component/tests/TestFileReaderProvider.java   | 12 +++++++++++-
 .../org/apache/streams/util/ComponentUtils.java   | 18 ++++++++++++++++++
 11 files changed, 118 insertions(+), 4 deletions(-)
----------------------------------------------------------------------



[09/10] git commit: STREAMS-197 | Merged apache master and handle merge conflict

Posted by re...@apache.org.
STREAMS-197 | Merged apache master and handle merge conflict


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/f9588905
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/f9588905
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/f9588905

Branch: refs/heads/master
Commit: f9588905deeaa4bed0e3bad0945c626202e4c211
Parents: 0006a04 2c71550
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Tue Oct 21 18:00:34 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Tue Oct 21 18:00:34 2014 -0500

----------------------------------------------------------------------
 .../streams-persist-elasticsearch/pom.xml       |  47 +++++++
 .../ElasticsearchMetadataUtil.java              | 106 ++++++++++++++++
 .../ElasticsearchPersistDeleter.java            |   6 +-
 .../ElasticsearchPersistUpdater.java            |   6 +-
 .../ElasticsearchPersistWriter.java             |  47 +------
 .../DatumFromMetadataAsDocumentProcessor.java   | 122 +++++++++++++++++++
 .../processor/DatumFromMetadataProcessor.java   | 103 ++++++++++++++++
 .../processor/DocumentToMetadataProcessor.java  | 100 +++++++++++++++
 .../ElasticsearchConfiguration.json             |   3 +-
 .../test/TestDatumFromMetadataProcessor.java    |  81 ++++++++++++
 .../test/TestDocumentToMetadataProcessor.java   |  63 ++++++++++
 .../test/TestElasticsearchPersistWriter.java    |  70 +++++++++++
 .../provider/TwitterTimelineProvider.java       |  14 ++-
 .../provider/TwitterTimelineProviderTest.java   |  39 ++++++
 .../local/tasks/StreamsProcessorTask.java       |   2 +-
 .../local/builders/LocalStreamBuilderTest.java  |  12 ++
 ...nhandledThrowableThreadPoolExecutorTest.java |  11 ++
 .../queues/ThroughputQueueMulitThreadTest.java  |  10 ++
 .../queues/ThroughputQueueSingleThreadTest.java |  11 ++
 .../streams/local/tasks/BasicTasksTest.java     |  10 ++
 .../local/tasks/StreamsProviderTaskTest.java    |  10 ++
 .../tests/TestComponentsLocalStream.java        |  13 +-
 .../tests/TestExpectedDatumsPersitWriter.java   |  13 +-
 .../component/tests/TestFileReaderProvider.java |  12 +-
 .../org/apache/streams/util/ComponentUtils.java |  18 ++-
 25 files changed, 868 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f9588905/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
----------------------------------------------------------------------
diff --cc streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
index bb65c4c,9f73560..169fe91
--- a/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
+++ b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
@@@ -108,19 -110,17 +110,33 @@@ public class ComponentUtils 
      }
  
      /**
+      * Removes all mbeans registered undered a specific domain.  Made specificly to clean up at unit tests
+      * @param domain
+      */
+     public static void removeAllMBeansOfDomain(String domain) throws Exception {
+         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+         domain = domain.endsWith(":") ? domain : domain+":";
+         ObjectName objectName = new ObjectName(domain+"*");
+         Set<ObjectName> mbeanNames = mbs.queryNames(objectName, null);
+         for(ObjectName name : mbeanNames) {
+             mbs.unregisterMBean(name);
+         }
+     }
+ 
++    /**
 +     * Attempts to register an object with local MBeanServer.  Throws runtime exception on errors.
 +     * @param name name to register bean with
 +     * @param mbean mbean to register
 +     */
 +    public static <V> void registerLocalMBean(String name, V mbean) {
 +        try {
 +            ObjectName objectName = new ObjectName(name);
 +            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
 +            mbs.registerMBean(mbean, objectName);
 +        } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) {
 +            LOGGER.error("Failed to register MXBean : {}", e);
 +            throw new RuntimeException(e);
 +        }
 +    }
 +
  }


[03/10] git commit: Added java doc

Posted by re...@apache.org.
Added java doc


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/393c2e79
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/393c2e79
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/393c2e79

Branch: refs/heads/master
Commit: 393c2e79b2534bb7f8ba344b6418488c84338c87
Parents: 6916a1b
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Fri Oct 17 18:03:04 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Fri Oct 17 18:03:04 2014 -0500

----------------------------------------------------------------------
 .../org/apache/streams/local/counters/StreamsTaskCounter.java    | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/393c2e79/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
index e864219..a0eb349 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
@@ -42,6 +42,10 @@ public class StreamsTaskCounter implements StreamsTaskCounterMXBean{
     @GuardedBy("this")
     private volatile long maxTime;
 
+    /**
+     *
+     * @param id
+     */
     public StreamsTaskCounter(String id) {
         this.emitted = new AtomicLong(0);
         this.received = new AtomicLong(0);


[07/10] git commit: STREAMS-197 | add util function and refactored

Posted by re...@apache.org.
STREAMS-197 | add util function and refactored


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/0006a044
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/0006a044
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/0006a044

Branch: refs/heads/master
Commit: 0006a044f001a68ba8ceb481ff4fbcb0ba0de270
Parents: f40ce9b
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Tue Oct 21 17:26:35 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Tue Oct 21 17:26:35 2014 -0500

----------------------------------------------------------------------
 streams-runtimes/streams-runtime-local/pom.xml    |  6 +++---
 .../local/counters/DatumStatusCounter.java        | 15 +++++----------
 .../local/counters/StreamsTaskCounter.java        | 10 ++--------
 .../org/apache/streams/util/ComponentUtils.java   | 18 ++++++++++++++++++
 4 files changed, 28 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0006a044/streams-runtimes/streams-runtime-local/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/pom.xml b/streams-runtimes/streams-runtime-local/pom.xml
index 0d9138d..9ee8d5b 100644
--- a/streams-runtimes/streams-runtime-local/pom.xml
+++ b/streams-runtimes/streams-runtime-local/pom.xml
@@ -46,17 +46,17 @@
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-core</artifactId>
-            <version>0.1-SNAPSHOT</version>
+            <version>${project.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-util</artifactId>
-            <version>0.1-SNAPSHOT</version>
+            <version>${project.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-pojo</artifactId>
-            <version>0.1-SNAPSHOT</version>
+            <version>${project.version}</version>
         </dependency>
 
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0006a044/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java
index 88c3a6f..acada71 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java
@@ -18,6 +18,7 @@
 package org.apache.streams.local.counters;
 
 import net.jcip.annotations.ThreadSafe;
+import org.apache.streams.util.ComponentUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,14 +41,7 @@ public class DatumStatusCounter implements DatumStatusCounterMXBean{
     public DatumStatusCounter(String id) {
         this.failed = new AtomicLong(0);
         this.passed = new AtomicLong(0);
-        try {
-            ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id));
-            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-            mbs.registerMBean(this, name);
-        } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) {
-            LOGGER.error("Failed to register MXBean : {}", e);
-            throw new RuntimeException(e);
-        }
+        ComponentUtils.registerLocalMBean(String.format(NAME_TEMPLATE, id), this);
     }
 
     public void incrementFailedCount() {
@@ -70,10 +64,11 @@ public class DatumStatusCounter implements DatumStatusCounterMXBean{
     @Override
     public double getFailRate() {
         double failed = this.failed.get();
-        if(failed == 0.0 && this.passed.get() == 0) {
+        double passed = this.passed.get();
+        if(failed == 0.0 && passed == 0) {
             return 0.0;
         }
-        return failed / (this.passed.get() + failed);
+        return failed / (passed + failed);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0006a044/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
index 8801df2..68c6364 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
@@ -19,6 +19,7 @@ package org.apache.streams.local.counters;
 
 import net.jcip.annotations.GuardedBy;
 import net.jcip.annotations.ThreadSafe;
+import org.apache.streams.util.ComponentUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,14 +53,7 @@ public class StreamsTaskCounter implements StreamsTaskCounterMXBean{
         this.errors = new AtomicLong(0);
         this.totalTime = new AtomicLong(0);
         this.maxTime = -1;
-        try {
-            ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id));
-            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-            mbs.registerMBean(this, name);
-        } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) {
-            LOGGER.error("Failed to register MXBean : {}", e);
-            throw new RuntimeException(e);
-        }
+        ComponentUtils.registerLocalMBean(String.format(NAME_TEMPLATE, id), this);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0006a044/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
index 9f3c480..bb65c4c 100644
--- a/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
+++ b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
@@ -22,6 +22,8 @@ import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.management.*;
+import java.lang.management.ManagementFactory;
 import java.util.Queue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -105,4 +107,20 @@ public class ComponentUtils {
         }
     }
 
+    /**
+     * Attempts to register an object with local MBeanServer.  Throws runtime exception on errors.
+     * @param name name to register bean with
+     * @param mbean mbean to register
+     */
+    public static <V> void registerLocalMBean(String name, V mbean) {
+        try {
+            ObjectName objectName = new ObjectName(name);
+            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+            mbs.registerMBean(mbean, objectName);
+        } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) {
+            LOGGER.error("Failed to register MXBean : {}", e);
+            throw new RuntimeException(e);
+        }
+    }
+
 }


[02/10] git commit: Added processing time

Posted by re...@apache.org.
Added processing time


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/6916a1b7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/6916a1b7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/6916a1b7

Branch: refs/heads/master
Commit: 6916a1b78318f7dec9d73121e25e1f6b545c9a78
Parents: c030d43
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Fri Oct 17 17:53:34 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Fri Oct 17 17:53:34 2014 -0500

----------------------------------------------------------------------
 .../local/counters/StreamsTaskCounter.java      | 53 ++++++++++++++++++++
 .../counters/StreamsTaskCounterMXBean.java      | 10 ++++
 2 files changed, 63 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6916a1b7/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
index ffd9f25..e864219 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
@@ -17,6 +17,7 @@
  */
 package org.apache.streams.local.counters;
 
+import net.jcip.annotations.GuardedBy;
 import net.jcip.annotations.ThreadSafe;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,11 +38,16 @@ public class StreamsTaskCounter implements StreamsTaskCounterMXBean{
     private AtomicLong emitted;
     private AtomicLong received;
     private AtomicLong errors;
+    private AtomicLong totalTime;
+    @GuardedBy("this")
+    private volatile long maxTime;
 
     public StreamsTaskCounter(String id) {
         this.emitted = new AtomicLong(0);
         this.received = new AtomicLong(0);
         this.errors = new AtomicLong(0);
+        this.totalTime = new AtomicLong(0);
+        this.maxTime = -1;
         try {
             ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id));
             MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
@@ -52,30 +58,64 @@ public class StreamsTaskCounter implements StreamsTaskCounterMXBean{
         }
     }
 
+    /**
+     * Increment emitted count
+     */
     public void incrementEmittedCount() {
         this.incrementEmittedCount(1);
     }
 
+    /**
+     * Increment emitted count
+     * @param delta
+     */
     public void incrementEmittedCount(long delta) {
         this.emitted.addAndGet(delta);
     }
 
+    /**
+     * Increment error count
+     */
     public void incrementErrorCount() {
         this.incrementErrorCount(1);
     }
 
+    /**
+     * Increment error count
+     * @param delta
+     */
     public void incrementErrorCount(long delta) {
         this.errors.addAndGet(delta);
     }
 
+    /**
+     * Increment received count
+     */
     public void incrementReceivedCount() {
         this.incrementReceivedCount(1);
     }
 
+    /**
+     * Increment received count
+     * @param delta
+     */
     public void incrementReceivedCount(long delta) {
         this.received.addAndGet(delta);
     }
 
+    /**
+     * Add the time it takes to process a single datum in milliseconds
+     * @param processTime
+     */
+    public void addTime(long processTime) {
+        synchronized (this) {
+            if(processTime > this.maxTime) {
+                this.maxTime = processTime;
+            }
+        }
+        this.totalTime.addAndGet(processTime);
+    }
+
     @Override
     public double getErrorRate() {
         if(this.received.get() == 0) {
@@ -98,4 +138,17 @@ public class StreamsTaskCounter implements StreamsTaskCounterMXBean{
     public long getNumUnhandledErrors() {
         return this.errors.get();
     }
+
+    @Override
+    public double getAvgTime() {
+        if(this.received.get() == 0) {
+            return 0.0;
+        }
+        return this.totalTime.get() / (double) this.received.get();
+    }
+
+    @Override
+    public long getMaxTime() {
+        return this.maxTime;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6916a1b7/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java
index 634857d..8ac2e33 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java
@@ -48,6 +48,16 @@ public interface StreamsTaskCounterMXBean {
      */
     public long getNumUnhandledErrors();
 
+    /**
+     * Returns the average time in milliseconds it takes the task to readCurrent, process, or write to return.
+     * @return
+     */
+    public double getAvgTime();
 
+    /**
+     * Returns the max time in milliseconds it takes the task to readCurrent, process, or write to return.
+     * @return
+     */
+    public long getMaxTime();
 
 }