You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by sh...@apache.org on 2016/11/23 16:12:24 UTC
incubator-unomi git commit: UNOMI-63 Use ElasticSearch BulkProcessing
to perform segment updates - Switch to TransportClient to hopefully make
load-balancing work better. This might have a minor slowdown on standalone
performance.
Repository: incubator-unomi
Updated Branches:
refs/heads/feature-UNOMI-28-ES2X 661daeea5 -> 1075a02cf
UNOMI-63 Use ElasticSearch BulkProcessing to perform segment updates
- Switch to TransportClient to hopefully make load-balancing work better. This might have a minor slowdown on standalone performance.
Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/1075a02c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/1075a02c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/1075a02c
Branch: refs/heads/feature-UNOMI-28-ES2X
Commit: 1075a02cfeb6a4c59d935185a3f1d81fc5159f06
Parents: 661daee
Author: Serge Huber <sh...@apache.org>
Authored: Wed Nov 23 17:12:16 2016 +0100
Committer: Serge Huber <sh...@apache.org>
Committed: Wed Nov 23 17:12:16 2016 +0100
----------------------------------------------------------------------
.../ElasticSearchPersistenceServiceImpl.java | 26 ++++++++++++++++----
1 file changed, 21 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/1075a02c/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 733b3a9..794b03b 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -53,10 +53,12 @@ import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
+import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.DistanceUnit;
@@ -96,9 +98,7 @@ import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
+import java.net.*;
import java.nio.file.Paths;
import java.text.ParseException;
import java.text.SimpleDateFormat;
@@ -146,6 +146,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
public static final String ELASTICSEARCH_NETWORK_HOST = "network.host";
private Node node;
+ private Client nodeClient;
private Client client;
private BulkProcessor bulkProcessor;
private String clusterName;
@@ -356,15 +357,26 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
}
node = nodeBuilder().settings(settingsBuilder).node();
- client = node.client();
+ nodeClient = node.client();
+
logger.info("Waiting for ElasticSearch to start...");
- client.admin().cluster().prepareHealth()
+ nodeClient.admin().cluster().prepareHealth()
.setWaitForGreenStatus()
.get();
logger.info("Cluster status is GREEN");
+ try {
+ Settings transportSettings = Settings.settingsBuilder()
+ .put(CLUSTER_NAME, clusterName).build();
+ client = TransportClient.builder().settings(transportSettings).build()
+ .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(address), 9300));
+ } catch (UnknownHostException e) {
+ logger.error("Error resolving address " + address + " ElasticSearch transport client not connected, using internal client instead", e);
+ client = nodeClient;
+ }
+
// @todo is there a better way to detect index existence than to wait for it to startup ?
boolean indexExists = false;
int tries = 0;
@@ -570,6 +582,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
logger.error("Error waiting for bulk operations to flush !", e);
}
}
+ if (nodeClient != client) {
+ client.close();
+ }
+ nodeClient.close();
node.close();
return null;
}