You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/08 21:13:12 UTC

[21/38] incubator-streams git commit: Merged apache master

Merged apache master


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/377fb897
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/377fb897
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/377fb897

Branch: refs/heads/STREAMS-49
Commit: 377fb89717f34e6e109cf2655d6e3e335a494499
Parents: 015fade 5c2e832
Author: Ryan Ebanks <ry...@gmail.com>
Authored: Fri Oct 31 16:32:16 2014 -0500
Committer: Ryan Ebanks <ry...@gmail.com>
Committed: Fri Oct 31 16:32:16 2014 -0500

----------------------------------------------------------------------
 streams-contrib/streams-amazon-aws/pom.xml      |   2 +-
 .../org/apache/streams/s3/S3Configurator.java   |  55 ++++-----
 .../org/apache/streams/s3/S3PersistReader.java  |  11 +-
 .../org/apache/streams/s3/S3PersistWriter.java  |  10 +-
 .../org/apache/streams/s3/S3Configuration.json  |  18 ++-
 .../streams-persist-elasticsearch/pom.xml       |  47 +++++++
 .../ElasticsearchMetadataUtil.java              | 106 ++++++++++++++++
 .../ElasticsearchPersistDeleter.java            |   6 +-
 .../ElasticsearchPersistUpdater.java            |   6 +-
 .../ElasticsearchPersistWriter.java             |  47 +------
 .../DatumFromMetadataAsDocumentProcessor.java   | 122 +++++++++++++++++++
 .../processor/DatumFromMetadataProcessor.java   | 103 ++++++++++++++++
 .../processor/DocumentToMetadataProcessor.java  | 100 +++++++++++++++
 .../ElasticsearchConfiguration.json             |   3 +-
 .../test/TestDatumFromMetadataProcessor.java    |  81 ++++++++++++
 .../test/TestDocumentToMetadataProcessor.java   |  63 ++++++++++
 .../test/TestElasticsearchPersistWriter.java    |  70 +++++++++++
 .../google-gplus/pom.xml                        |   2 +-
 .../processor/GooglePlusTypeConverter.java      |  91 ++++++++++++++
 .../util/GPlusPersonDeserializer.java           | 107 ++++++++++++++++
 .../serializer/util/GooglePlusActivityUtil.java | 115 +++++++++++++++++
 .../google/gplus/GooglePlusPersonSerDeTest.java |  97 +++++++++++++++
 .../processor/GooglePlusTypeConverterTest.java  |  98 +++++++++++++++
 .../test/resources/google_plus_person_jsons.txt |   2 +
 .../provider/TwitterTimelineProvider.java       |  14 ++-
 .../provider/TwitterTimelineProviderTest.java   |  39 ++++++
 streams-runtimes/streams-runtime-local/pom.xml  |   6 +-
 .../local/counters/DatumStatusCounter.java      |  15 +--
 .../local/counters/StreamsTaskCounter.java      |  10 +-
 .../local/tasks/StreamsProcessorTask.java       |   3 +-
 .../local/builders/LocalStreamBuilderTest.java  |  12 ++
 ...nhandledThrowableThreadPoolExecutorTest.java |  11 ++
 .../queues/ThroughputQueueMulitThreadTest.java  |  10 ++
 .../queues/ThroughputQueueSingleThreadTest.java |  11 ++
 .../streams/local/tasks/BasicTasksTest.java     |  15 +--
 .../local/tasks/StreamsProviderTaskTest.java    |  10 ++
 .../tests/TestComponentsLocalStream.java        |  13 +-
 .../tests/TestExpectedDatumsPersitWriter.java   |  13 +-
 .../component/tests/TestFileReaderProvider.java |  12 +-
 .../org/apache/streams/util/ComponentUtils.java |  33 +++++
 40 files changed, 1455 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/377fb897/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
----------------------------------------------------------------------
diff --cc streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
index 8d66847,1bb565d..33c5827
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
@@@ -119,15 -117,11 +119,14 @@@ public class StreamsProcessorTask exten
                      Thread.currentThread().interrupt();
                  }
                  if(datum != null) {
 +                    this.counter.incrementReceivedCount();
                      try {
 +                        long startTime = System.currentTimeMillis();
                          List<StreamsDatum> output = this.processor.process(datum);
 +                        this.counter.addTime(System.currentTimeMillis() - startTime);
                          if(output != null) {
                              for(StreamsDatum outDatum : output) {
-                                 super.addToOutgoingQueue(datum);
-                                 this.counter.incrementEmittedCount();
+                                 super.addToOutgoingQueue(outDatum);
                                  statusCounter.incrementStatus(DatumStatus.SUCCESS);
                              }
                          }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/377fb897/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/377fb897/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
----------------------------------------------------------------------
diff --cc streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
index f62250d,2d28602..a0e28cd
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
@@@ -42,18 -38,12 +43,13 @@@ import static org.junit.Assert.*
  public class BasicTasksTest {
  
  
 +    private static final String MBEAN_ID = "test_bean";
- 
-     /**
-      * Remove registered mbeans from previous tests
-      * @throws Exception
-      */
      @After
-     public void unregisterMXBean() throws Exception {
+     public void removeLocalMBeans() {
          try {
-             ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(StreamsTaskCounter.NAME_TEMPLATE, MBEAN_ID)));
-         } catch (InstanceNotFoundException ife) {
-             //No-op
+             ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
+         } catch (Exception e) {
+             //No op.  proceed to next test
          }
      }