You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by re...@apache.org on 2014/10/22 02:03:31 UTC

[09/10] git commit: STREAMS-197 | Merged apache master and handle merge conflict

STREAMS-197 | Merged apache master and handle merge conflict


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/f9588905
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/f9588905
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/f9588905

Branch: refs/heads/master
Commit: f9588905deeaa4bed0e3bad0945c626202e4c211
Parents: 0006a04 2c71550
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Tue Oct 21 18:00:34 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Tue Oct 21 18:00:34 2014 -0500

----------------------------------------------------------------------
 .../streams-persist-elasticsearch/pom.xml       |  47 +++++++
 .../ElasticsearchMetadataUtil.java              | 106 ++++++++++++++++
 .../ElasticsearchPersistDeleter.java            |   6 +-
 .../ElasticsearchPersistUpdater.java            |   6 +-
 .../ElasticsearchPersistWriter.java             |  47 +------
 .../DatumFromMetadataAsDocumentProcessor.java   | 122 +++++++++++++++++++
 .../processor/DatumFromMetadataProcessor.java   | 103 ++++++++++++++++
 .../processor/DocumentToMetadataProcessor.java  | 100 +++++++++++++++
 .../ElasticsearchConfiguration.json             |   3 +-
 .../test/TestDatumFromMetadataProcessor.java    |  81 ++++++++++++
 .../test/TestDocumentToMetadataProcessor.java   |  63 ++++++++++
 .../test/TestElasticsearchPersistWriter.java    |  70 +++++++++++
 .../provider/TwitterTimelineProvider.java       |  14 ++-
 .../provider/TwitterTimelineProviderTest.java   |  39 ++++++
 .../local/tasks/StreamsProcessorTask.java       |   2 +-
 .../local/builders/LocalStreamBuilderTest.java  |  12 ++
 ...nhandledThrowableThreadPoolExecutorTest.java |  11 ++
 .../queues/ThroughputQueueMulitThreadTest.java  |  10 ++
 .../queues/ThroughputQueueSingleThreadTest.java |  11 ++
 .../streams/local/tasks/BasicTasksTest.java     |  10 ++
 .../local/tasks/StreamsProviderTaskTest.java    |  10 ++
 .../tests/TestComponentsLocalStream.java        |  13 +-
 .../tests/TestExpectedDatumsPersitWriter.java   |  13 +-
 .../component/tests/TestFileReaderProvider.java |  12 +-
 .../org/apache/streams/util/ComponentUtils.java |  18 ++-
 25 files changed, 868 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f9588905/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
----------------------------------------------------------------------
diff --cc streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
index bb65c4c,9f73560..169fe91
--- a/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
+++ b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
@@@ -108,19 -110,17 +110,33 @@@ public class ComponentUtils 
      }
  
      /**
+      * Removes all mbeans registered undered a specific domain.  Made specificly to clean up at unit tests
+      * @param domain
+      */
+     public static void removeAllMBeansOfDomain(String domain) throws Exception {
+         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+         domain = domain.endsWith(":") ? domain : domain+":";
+         ObjectName objectName = new ObjectName(domain+"*");
+         Set<ObjectName> mbeanNames = mbs.queryNames(objectName, null);
+         for(ObjectName name : mbeanNames) {
+             mbs.unregisterMBean(name);
+         }
+     }
+ 
++    /**
 +     * Attempts to register an object with local MBeanServer.  Throws runtime exception on errors.
 +     * @param name name to register bean with
 +     * @param mbean mbean to register
 +     */
 +    public static <V> void registerLocalMBean(String name, V mbean) {
 +        try {
 +            ObjectName objectName = new ObjectName(name);
 +            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
 +            mbs.registerMBean(mbean, objectName);
 +        } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) {
 +            LOGGER.error("Failed to register MXBean : {}", e);
 +            throw new RuntimeException(e);
 +        }
 +    }
 +
  }