You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sm...@apache.org on 2016/10/22 14:53:57 UTC
incubator-streams git commit: Trivial fixes,
this closes apache/incubator-streams#311
Repository: incubator-streams
Updated Branches:
refs/heads/master 7aaba1e9b -> 6f5caa238
Trivial fixes, this closes apache/incubator-streams#311
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/6f5caa23
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/6f5caa23
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/6f5caa23
Branch: refs/heads/master
Commit: 6f5caa238af20e8eb519a4877011574976fb1089
Parents: 7aaba1e
Author: smarthi <sm...@apache.org>
Authored: Sat Oct 22 10:53:10 2016 -0400
Committer: smarthi <sm...@apache.org>
Committed: Sat Oct 22 10:53:10 2016 -0400
----------------------------------------------------------------------
.../elasticsearch/ElasticsearchClient.java | 3 ---
.../ElasticsearchClientManager.java | 13 +++++-------
.../ElasticsearchMetadataUtil.java | 15 ++++++-------
.../ElasticsearchPersistReader.java | 22 ++++++++++----------
.../ElasticsearchPersistWriter.java | 13 +-----------
.../elasticsearch/ElasticsearchQuery.java | 1 +
.../regex/AbstractRegexExtensionExtractor.java | 8 ++++---
7 files changed, 29 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6f5caa23/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java
index 5bb0e9d..0b2b782 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java
@@ -21,9 +21,6 @@ package org.apache.streams.elasticsearch;
import org.elasticsearch.Version;
import org.elasticsearch.client.Client;
-/**
- * Created by sblackmon on 2/10/14.
- */
public class ElasticsearchClient {
private Client client;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6f5caa23/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
index 60ffb5f..4809334 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
@@ -36,17 +36,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
-/**
- * Created by sblackmon on 2/10/14.
- */
public class ElasticsearchClientManager {
private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchClientManager.class);
- private static Map<String, ElasticsearchClient> ALL_CLIENTS = new HashMap<String, ElasticsearchClient>();
+ private static Map<String, ElasticsearchClient> ALL_CLIENTS = new HashMap<>();
private ElasticsearchConfiguration elasticsearchConfiguration;
@@ -85,7 +82,7 @@ public class ElasticsearchClientManager {
}
public void start() throws Exception {
- /***********************************************************************
+ /*
* Note:
* Everything in these classes is being switched to lazy loading. Within
* Heroku you only have 60 seconds to connect, and bind to the service,
@@ -130,11 +127,11 @@ public class ElasticsearchClientManager {
}
public boolean equals(Object o) {
- return EqualsBuilder.reflectionEquals(this, o, Arrays.asList(this.elasticsearchConfiguration.toString()));
+ return EqualsBuilder.reflectionEquals(this, o, Collections.singletonList(this.elasticsearchConfiguration.toString()));
}
public int hashCode() {
- return HashCodeBuilder.reflectionHashCode(this, Arrays.asList(this.elasticsearchConfiguration.toString()));
+ return HashCodeBuilder.reflectionHashCode(this, Collections.singletonList(this.elasticsearchConfiguration.toString()));
}
private synchronized void checkAndLoadClient(String clusterName) {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6f5caa23/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java
index 6cbd203..100b0c5 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java
@@ -19,15 +19,12 @@
package org.apache.streams.elasticsearch;
import com.fasterxml.jackson.databind.JsonNode;
-import com.google.common.collect.Maps;
import org.apache.streams.core.StreamsDatum;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-/**
- * Created by sblackmon on 10/20/14.
- */
public class ElasticsearchMetadataUtil {
public static String getIndex(Map<String, Object> metadata, ElasticsearchWriterConfiguration config) {
@@ -100,25 +97,25 @@ public class ElasticsearchMetadataUtil {
return id;
}
- public static String getParent(StreamsDatum datum) {
+ static String getParent(StreamsDatum datum) {
String parent = null;
Map<String, Object> metadata = datum.getMetadata();
- if( parent == null && metadata != null && metadata.containsKey("parent"))
+ if(metadata != null && metadata.containsKey("parent"))
parent = (String) datum.getMetadata().get("parent");
return parent;
}
- public static String getRouting(StreamsDatum datum) {
+ static String getRouting(StreamsDatum datum) {
String routing = null;
Map<String, Object> metadata = datum.getMetadata();
- if( routing == null && metadata != null && metadata.containsKey("routing"))
+ if(metadata != null && metadata.containsKey("routing"))
routing = (String) datum.getMetadata().get("routing");
return routing;
@@ -133,7 +130,7 @@ public class ElasticsearchMetadataUtil {
public static Map<String, Object> asMap(JsonNode node) {
Iterator<Map.Entry<String, JsonNode>> iterator = node.fields();
- Map<String, Object> ret = Maps.newHashMap();
+ Map<String, Object> ret = new HashMap<>();
Map.Entry<String, JsonNode> entry;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6f5caa23/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
index 9103614..909f5c4 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
@@ -21,7 +21,10 @@ package org.apache.streams.elasticsearch;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Queues;
-import org.apache.streams.core.*;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistReader;
+import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.elasticsearch.search.SearchHit;
import org.joda.time.DateTime;
@@ -31,19 +34,16 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.math.BigInteger;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.Queue;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-/**
- * ***********************************************************************************************************
- * Authors:
- * smashew
- * steveblackmon
- * ************************************************************************************************************
- */
-
public class ElasticsearchPersistReader implements StreamsPersistReader, Serializable {
public static final String STREAMS_ID = "ElasticsearchPersistReader";
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6f5caa23/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 49523f8..8f9c7d7 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -24,9 +24,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.DatumStatus;
-import org.apache.streams.core.DatumStatusCountable;
-import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistWriter;
import org.apache.streams.jackson.StreamsJacksonMapper;
@@ -58,7 +55,7 @@ import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumStatusCountable, Serializable {
+public class ElasticsearchPersistWriter implements StreamsPersistWriter, Serializable {
public static final String STREAMS_ID = ElasticsearchPersistWriter.class.getCanonicalName();
@@ -268,14 +265,6 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
}
}
- @Override
- public DatumStatusCounter getDatumStatusCounter() {
- DatumStatusCounter counters = new DatumStatusCounter();
- counters.incrementStatus(DatumStatus.SUCCESS, (int)this.totalOk.get());
- counters.incrementStatus(DatumStatus.FAIL, (int)this.totalFailed.get());
- return counters;
- }
-
private synchronized void flushInternal() {
// we do not have a working bulk request, we can just exit here.
if (this.bulkRequest == null || this.currentBatchItems.get() == 0)
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6f5caa23/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
index d9e9273..3bb4b97 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
@@ -27,6 +27,7 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.script.Script;
+import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortBuilders;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6f5caa23/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/AbstractRegexExtensionExtractor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/AbstractRegexExtensionExtractor.java b/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/AbstractRegexExtensionExtractor.java
index e4fa0e0..0f46ccd 100644
--- a/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/AbstractRegexExtensionExtractor.java
+++ b/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/AbstractRegexExtensionExtractor.java
@@ -32,7 +32,9 @@ import org.apache.streams.pojo.json.Activity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -70,7 +72,7 @@ public abstract class AbstractRegexExtensionExtractor<T> implements StreamsProce
} else if (entry.getDocument() instanceof ObjectNode) {
activity = mapper.convertValue(entry.getDocument(), Activity.class);
} else {
- return Lists.newArrayList();
+ return new ArrayList<>();
}
if (Strings.isNullOrEmpty(pattern)) {
prepare(null);
@@ -81,7 +83,7 @@ public abstract class AbstractRegexExtensionExtractor<T> implements StreamsProce
entities.add(prepareObject(key));
}
- Set<T> set = Sets.newHashSet();
+ Set<T> set = new HashSet<>();
set.addAll(entities);
entities.clear();
entities.addAll(set);
@@ -122,7 +124,7 @@ public abstract class AbstractRegexExtensionExtractor<T> implements StreamsProce
if(extensions.containsKey(extensionKey) && extensions.get(extensionKey) != null) {
hashtags = Sets.newHashSet((Iterable<T>) extensions.get(extensionKey));
} else {
- hashtags = Sets.newHashSet();
+ hashtags = new HashSet<>();
}
extensions.put(extensionKey, hashtags);