You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by sh...@apache.org on 2016/11/23 16:12:24 UTC

incubator-unomi git commit: UNOMI-63 Use ElasticSearch BulkProcessing to perform segment updates - Switch to TransportClient to hopefully make load-balancing work better. This might have a minor slowdown on standalone performance.

Repository: incubator-unomi
Updated Branches:
  refs/heads/feature-UNOMI-28-ES2X 661daeea5 -> 1075a02cf


UNOMI-63 Use ElasticSearch BulkProcessing to perform segment updates
- Switch to TransportClient to hopefully make load-balancing work better. This might have a minor slowdown on standalone performance.


Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/1075a02c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/1075a02c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/1075a02c

Branch: refs/heads/feature-UNOMI-28-ES2X
Commit: 1075a02cfeb6a4c59d935185a3f1d81fc5159f06
Parents: 661daee
Author: Serge Huber <sh...@apache.org>
Authored: Wed Nov 23 17:12:16 2016 +0100
Committer: Serge Huber <sh...@apache.org>
Committed: Wed Nov 23 17:12:16 2016 +0100

----------------------------------------------------------------------
 .../ElasticSearchPersistenceServiceImpl.java    | 26 ++++++++++++++++----
 1 file changed, 21 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/1075a02c/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 733b3a9..794b03b 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -53,10 +53,12 @@ import org.elasticsearch.action.search.SearchType;
 import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.Requests;
+import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.SettingsException;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.DistanceUnit;
@@ -96,9 +98,7 @@ import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
+import java.net.*;
 import java.nio.file.Paths;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
@@ -146,6 +146,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
     public static final String ELASTICSEARCH_NETWORK_HOST = "network.host";
 
     private Node node;
+    private Client nodeClient;
     private Client client;
     private BulkProcessor bulkProcessor;
     private String clusterName;
@@ -356,15 +357,26 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                 }
 
                 node = nodeBuilder().settings(settingsBuilder).node();
-                client = node.client();
+                nodeClient = node.client();
+
                 logger.info("Waiting for ElasticSearch to start...");
 
-                client.admin().cluster().prepareHealth()
+                nodeClient.admin().cluster().prepareHealth()
                         .setWaitForGreenStatus()
                         .get();
 
                 logger.info("Cluster status is GREEN");
 
+                try {
+                    Settings transportSettings = Settings.settingsBuilder()
+                            .put(CLUSTER_NAME, clusterName).build();
+                    client = TransportClient.builder().settings(transportSettings).build()
+                            .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(address), 9300));
+                } catch (UnknownHostException e) {
+                    logger.error("Error resolving address " + address + " ElasticSearch transport client not connected, using internal client instead", e);
+                    client = nodeClient;
+                }
+
                 // @todo is there a better way to detect index existence than to wait for it to startup ?
                 boolean indexExists = false;
                 int tries = 0;
@@ -570,6 +582,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                         logger.error("Error waiting for bulk operations to flush !", e);
                     }
                 }
+                if (nodeClient != client) {
+                    client.close();
+                }
+                nodeClient.close();
                 node.close();
                 return null;
             }