You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sm...@apache.org on 2016/10/22 14:53:57 UTC

incubator-streams git commit: Trivial fixes, this closes apache/incubator-streams#311

Repository: incubator-streams
Updated Branches:
  refs/heads/master 7aaba1e9b -> 6f5caa238


Trivial fixes, this closes apache/incubator-streams#311


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/6f5caa23
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/6f5caa23
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/6f5caa23

Branch: refs/heads/master
Commit: 6f5caa238af20e8eb519a4877011574976fb1089
Parents: 7aaba1e
Author: smarthi <sm...@apache.org>
Authored: Sat Oct 22 10:53:10 2016 -0400
Committer: smarthi <sm...@apache.org>
Committed: Sat Oct 22 10:53:10 2016 -0400

----------------------------------------------------------------------
 .../elasticsearch/ElasticsearchClient.java      |  3 ---
 .../ElasticsearchClientManager.java             | 13 +++++-------
 .../ElasticsearchMetadataUtil.java              | 15 ++++++-------
 .../ElasticsearchPersistReader.java             | 22 ++++++++++----------
 .../ElasticsearchPersistWriter.java             | 13 +-----------
 .../elasticsearch/ElasticsearchQuery.java       |  1 +
 .../regex/AbstractRegexExtensionExtractor.java  |  8 ++++---
 7 files changed, 29 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6f5caa23/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java
index 5bb0e9d..0b2b782 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java
@@ -21,9 +21,6 @@ package org.apache.streams.elasticsearch;
 import org.elasticsearch.Version;
 import org.elasticsearch.client.Client;
 
-/**
- * Created by sblackmon on 2/10/14.
- */
 public class ElasticsearchClient {
 
     private Client client;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6f5caa23/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
index 60ffb5f..4809334 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
@@ -36,17 +36,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.InetAddress;
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
-/**
- * Created by sblackmon on 2/10/14.
- */
 public class ElasticsearchClientManager {
     private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchClientManager.class);
-    private static Map<String, ElasticsearchClient> ALL_CLIENTS = new HashMap<String, ElasticsearchClient>();
+    private static Map<String, ElasticsearchClient> ALL_CLIENTS = new HashMap<>();
 
     private ElasticsearchConfiguration elasticsearchConfiguration;
 
@@ -85,7 +82,7 @@ public class ElasticsearchClientManager {
     }
 
     public void start() throws Exception {
-        /***********************************************************************
+        /*
          * Note:
          * Everything in these classes is being switched to lazy loading. Within
          * Heroku you only have 60 seconds to connect, and bind to the service,
@@ -130,11 +127,11 @@ public class ElasticsearchClientManager {
     }
 
     public boolean equals(Object o) {
-        return EqualsBuilder.reflectionEquals(this, o, Arrays.asList(this.elasticsearchConfiguration.toString()));
+        return EqualsBuilder.reflectionEquals(this, o, Collections.singletonList(this.elasticsearchConfiguration.toString()));
     }
 
     public int hashCode() {
-        return HashCodeBuilder.reflectionHashCode(this, Arrays.asList(this.elasticsearchConfiguration.toString()));
+        return HashCodeBuilder.reflectionHashCode(this, Collections.singletonList(this.elasticsearchConfiguration.toString()));
     }
 
     private synchronized void checkAndLoadClient(String clusterName) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6f5caa23/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java
index 6cbd203..100b0c5 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java
@@ -19,15 +19,12 @@
 package org.apache.streams.elasticsearch;
 
 import com.fasterxml.jackson.databind.JsonNode;
-import com.google.common.collect.Maps;
 import org.apache.streams.core.StreamsDatum;
 
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 
-/**
- * Created by sblackmon on 10/20/14.
- */
 public class ElasticsearchMetadataUtil {
 
     public static String getIndex(Map<String, Object> metadata, ElasticsearchWriterConfiguration config) {
@@ -100,25 +97,25 @@ public class ElasticsearchMetadataUtil {
         return id;
     }
 
-    public static String getParent(StreamsDatum datum) {
+    static String getParent(StreamsDatum datum) {
 
         String parent = null;
 
         Map<String, Object> metadata = datum.getMetadata();
 
-        if( parent == null && metadata != null && metadata.containsKey("parent"))
+        if(metadata != null && metadata.containsKey("parent"))
             parent = (String) datum.getMetadata().get("parent");
 
         return parent;
     }
 
-    public static String getRouting(StreamsDatum datum) {
+    static String getRouting(StreamsDatum datum) {
 
         String routing = null;
 
         Map<String, Object> metadata = datum.getMetadata();
 
-        if( routing == null && metadata != null && metadata.containsKey("routing"))
+        if(metadata != null && metadata.containsKey("routing"))
             routing = (String) datum.getMetadata().get("routing");
 
         return routing;
@@ -133,7 +130,7 @@ public class ElasticsearchMetadataUtil {
     public static Map<String, Object> asMap(JsonNode node) {
 
         Iterator<Map.Entry<String, JsonNode>> iterator = node.fields();
-        Map<String, Object> ret = Maps.newHashMap();
+        Map<String, Object> ret = new HashMap<>();
 
         Map.Entry<String, JsonNode> entry;
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6f5caa23/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
index 9103614..909f5c4 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
@@ -21,7 +21,10 @@ package org.apache.streams.elasticsearch;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Queues;
-import org.apache.streams.core.*;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistReader;
+import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.elasticsearch.search.SearchHit;
 import org.joda.time.DateTime;
@@ -31,19 +34,16 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.io.Serializable;
 import java.math.BigInteger;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.Queue;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-/**
- * ***********************************************************************************************************
- * Authors:
- * smashew
- * steveblackmon
- * ************************************************************************************************************
- */
-
 public class ElasticsearchPersistReader implements StreamsPersistReader, Serializable {
 
     public static final String STREAMS_ID = "ElasticsearchPersistReader";

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6f5caa23/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 49523f8..8f9c7d7 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -24,9 +24,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Preconditions;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.DatumStatus;
-import org.apache.streams.core.DatumStatusCountable;
-import org.apache.streams.core.DatumStatusCounter;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
 import org.apache.streams.jackson.StreamsJacksonMapper;
@@ -58,7 +55,7 @@ import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
-public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumStatusCountable, Serializable {
+public class ElasticsearchPersistWriter implements StreamsPersistWriter, Serializable {
 
     public static final String STREAMS_ID = ElasticsearchPersistWriter.class.getCanonicalName();
 
@@ -268,14 +265,6 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
         }
     }
 
-    @Override
-    public DatumStatusCounter getDatumStatusCounter() {
-        DatumStatusCounter counters = new DatumStatusCounter();
-        counters.incrementStatus(DatumStatus.SUCCESS, (int)this.totalOk.get());
-        counters.incrementStatus(DatumStatus.FAIL, (int)this.totalFailed.get());
-        return counters;
-    }
-
     private synchronized void flushInternal() {
         // we do not have a working bulk request, we can just exit here.
         if (this.bulkRequest == null || this.currentBatchItems.get() == 0)

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6f5caa23/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
index d9e9273..3bb4b97 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
@@ -27,6 +27,7 @@ import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.SearchType;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.script.Script;
+import org.elasticsearch.search.Scroll;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.sort.SortBuilders;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6f5caa23/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/AbstractRegexExtensionExtractor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/AbstractRegexExtensionExtractor.java b/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/AbstractRegexExtensionExtractor.java
index e4fa0e0..0f46ccd 100644
--- a/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/AbstractRegexExtensionExtractor.java
+++ b/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/AbstractRegexExtensionExtractor.java
@@ -32,7 +32,9 @@ import org.apache.streams.pojo.json.Activity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -70,7 +72,7 @@ public abstract class AbstractRegexExtensionExtractor<T> implements StreamsProce
         } else if (entry.getDocument() instanceof ObjectNode) {
             activity = mapper.convertValue(entry.getDocument(), Activity.class);
         } else {
-            return Lists.newArrayList();
+            return new ArrayList<>();
         }
         if (Strings.isNullOrEmpty(pattern)) {
             prepare(null);
@@ -81,7 +83,7 @@ public abstract class AbstractRegexExtensionExtractor<T> implements StreamsProce
             entities.add(prepareObject(key));
         }
 
-        Set<T> set = Sets.newHashSet();
+        Set<T> set = new HashSet<>();
         set.addAll(entities);
         entities.clear();
         entities.addAll(set);
@@ -122,7 +124,7 @@ public abstract class AbstractRegexExtensionExtractor<T> implements StreamsProce
         if(extensions.containsKey(extensionKey) && extensions.get(extensionKey) != null) {
             hashtags = Sets.newHashSet((Iterable<T>) extensions.get(extensionKey));
         } else {
-            hashtags = Sets.newHashSet();
+            hashtags = new HashSet<>();
         }
 
         extensions.put(extensionKey, hashtags);