You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by sh...@apache.org on 2016/11/14 16:26:00 UTC

[38/38] incubator-unomi git commit: UNOMI-63 Use ElasticSearch BulkProcessing to perform segment updates - Fix merge issues with master - Implement bulk processor support for ElasticSearch update operations (indexing operations not yet integrated because

UNOMI-63 Use ElasticSearch BulkProcessing to perform segment updates
- Fix merge issues with master
- Implement bulk processor support for ElasticSearch update operations (indexing operations not yet integrated because of IndexNotFoundException handling)
- Minor logging improvements in SegmentService


Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/285d4cc6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/285d4cc6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/285d4cc6

Branch: refs/heads/UNOMI-28-ES-2-X-UPGRADE
Commit: 285d4cc6dbcce938f094188fa3809f93fb9dc52b
Parents: f0bd45f
Author: Serge Huber <sh...@apache.org>
Authored: Mon Nov 14 17:25:05 2016 +0100
Committer: Serge Huber <sh...@apache.org>
Committed: Mon Nov 14 17:25:05 2016 +0100

----------------------------------------------------------------------
 package/pom.xml                                 |   5 +-
 .../ElasticSearchPersistenceServiceImpl.java    | 133 +++++++++++++++++--
 .../services/services/SegmentServiceImpl.java   |   8 +-
 3 files changed, 130 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/285d4cc6/package/pom.xml
----------------------------------------------------------------------
diff --git a/package/pom.xml b/package/pom.xml
index 5de850b..d83f4c3 100644
--- a/package/pom.xml
+++ b/package/pom.xml
@@ -123,9 +123,6 @@
     <profiles>
         <profile>
             <id>src</id>
-            <activation>
-                <activeByDefault>true</activeByDefault>
-            </activation>
             <build>
                 <plugins>
                     <plugin>
@@ -308,7 +305,7 @@
                     </plugin>
                 </plugins>
             </build>
+        </profile>
     </profiles>
 
-
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/285d4cc6/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 16f1bc6..d13f7b6 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -40,8 +40,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
 import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
 import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.bulk.*;
 import org.elasticsearch.action.deletebyquery.DeleteByQueryAction;
 import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
 import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
@@ -51,12 +50,15 @@ import org.elasticsearch.action.percolate.PercolateResponse;
 import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.SearchType;
+import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.Requests;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.SettingsException;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.DistanceUnit;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.IndexNotFoundException;
@@ -101,6 +103,7 @@ import java.nio.file.Paths;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 
 import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
 
@@ -114,6 +117,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
     public static final String CONTEXTSERVER_PORT = "contextserver.port";
     public static final String CONTEXTSERVER_SECURE_ADDRESS = "contextserver.secureAddress";
     public static final String CONTEXTSERVER_SECURE_PORT = "contextserver.securePort";
+    public static final String CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_NAME = "contextserver.elasticsearch.bulkprocessor.name";
+    public static final String CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_CONCURRENTREQUEST = "contextserver.elasticsearch.bulkprocessor.concurrentRequests";
+    public static final String CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BULKACTIONS = "contextserver.elasticsearch.bulkprocessor.bulkActions";
+    public static final String CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BULKSIZE = "contextserver.elasticsearch.bulkprocessor.bulkSize";
+    public static final String CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_FLUSHINTERVAL = "contextserver.elasticsearch.bulkprocessor.flushInterval";
+    public static final String CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BACKOFFPOLICY = "contextserver.elasticsearch.bulkprocessor.backoffPolicy";
     public static final String KARAF_HOME = "karaf.home";
     public static final String ELASTICSEARCH_HOME_DIRECTORY = "elasticsearch";
     public static final String ELASTICSEARCH_PLUGINS_DIRECTORY = ELASTICSEARCH_HOME_DIRECTORY + "/plugins";
@@ -136,6 +145,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
 
     private Node node;
     private Client client;
+    private BulkProcessor bulkProcessor;
     private String clusterName;
     private String indexName;
     private String monthlyIndexNumberOfShards;
@@ -351,6 +361,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
 
                 getMonthlyIndex(new Date(), true);
 
+                if (client != null && bulkProcessor == null) {
+                    bulkProcessor = getBulkProcessor();
+                }
+
                 return null;
             }
         }.executeInClassLoader();
@@ -388,11 +402,104 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
         logger.info(this.getClass().getName() + " service started successfully.");
     }
 
+    public BulkProcessor getBulkProcessor() {
+        if (bulkProcessor != null) {
+            return bulkProcessor;
+        }
+        BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(
+                client,
+                new BulkProcessor.Listener() {
+                    @Override
+                    public void beforeBulk(long executionId,
+                                           BulkRequest request) {
+                        logger.debug("Before Bulk");
+                    }
+
+                    @Override
+                    public void afterBulk(long executionId,
+                                          BulkRequest request,
+                                          BulkResponse response) {
+                        logger.debug("After Bulk");
+                    }
+
+                    @Override
+                    public void afterBulk(long executionId,
+                                          BulkRequest request,
+                                          Throwable failure) {
+                        logger.error("After Bulk (failure)", failure);
+                        // we could add index creation here in the case of index seperation by dates.
+                    }
+                });
+        if (System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_NAME) != null) {
+            String name = System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_NAME);
+            if (name != null && name.length() > 0) {
+                bulkProcessorBuilder.setName(name);
+            }
+        }
+        if (System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_CONCURRENTREQUEST) != null) {
+            String concurrentRequestsStr = System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_CONCURRENTREQUEST);
+            int concurrentRequests = Integer.parseInt(concurrentRequestsStr);
+            if (concurrentRequests > 1) {
+                bulkProcessorBuilder.setConcurrentRequests(concurrentRequests);
+            }
+        }
+        if (System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BULKACTIONS) != null) {
+            String bulkActionsStr = System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BULKACTIONS);
+            int bulkActions = Integer.parseInt(bulkActionsStr);
+            bulkProcessorBuilder.setBulkActions(bulkActions);
+        }
+        if (System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BULKSIZE) != null) {
+            String bulkSizeStr = System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BULKSIZE);
+            bulkProcessorBuilder.setBulkSize(ByteSizeValue.parseBytesSizeValue(bulkSizeStr, new ByteSizeValue(5, ByteSizeUnit.MB), CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BULKSIZE));
+        }
+        if (System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_FLUSHINTERVAL) != null) {
+            String flushIntervalStr = System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_FLUSHINTERVAL);
+            bulkProcessorBuilder.setFlushInterval(TimeValue.parseTimeValue(flushIntervalStr, null, CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_FLUSHINTERVAL));
+        } else {
+            // in ElasticSearch this defaults to null, but we would like to set a value to 5 seconds by default
+            bulkProcessorBuilder.setFlushInterval(new TimeValue(5, TimeUnit.SECONDS));
+        }
+        if (System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BACKOFFPOLICY) != null) {
+            String backoffPolicyStr = System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BACKOFFPOLICY);
+            if (backoffPolicyStr != null && backoffPolicyStr.length() > 0) {
+                backoffPolicyStr = backoffPolicyStr.toLowerCase();
+                if ("nobackoff".equals(backoffPolicyStr)) {
+                    bulkProcessorBuilder.setBackoffPolicy(BackoffPolicy.noBackoff());
+                } else if (backoffPolicyStr.startsWith("constant(")) {
+                    int paramStartPos = backoffPolicyStr.indexOf("constant(" + "constant(".length());
+                    int paramEndPos = backoffPolicyStr.indexOf(")", paramStartPos);
+                    int paramSeparatorPos = backoffPolicyStr.indexOf(",", paramStartPos);
+                    TimeValue delay = TimeValue.parseTimeValue(backoffPolicyStr.substring(paramStartPos, paramSeparatorPos), new TimeValue(5, TimeUnit.SECONDS), CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BACKOFFPOLICY );
+                    int maxNumberOfRetries = Integer.parseInt(backoffPolicyStr.substring(paramSeparatorPos+1, paramEndPos));
+                    bulkProcessorBuilder.setBackoffPolicy(BackoffPolicy.constantBackoff(delay, maxNumberOfRetries));
+                } else if (backoffPolicyStr.startsWith("exponential")) {
+                    if (!backoffPolicyStr.contains("(")) {
+                        bulkProcessorBuilder.setBackoffPolicy(BackoffPolicy.exponentialBackoff());
+                    } else {
+                        // we detected parameters, must process them.
+                        int paramStartPos = backoffPolicyStr.indexOf("exponential(" + "exponential(".length());
+                        int paramEndPos = backoffPolicyStr.indexOf(")", paramStartPos);
+                        int paramSeparatorPos = backoffPolicyStr.indexOf(",", paramStartPos);
+                        TimeValue delay = TimeValue.parseTimeValue(backoffPolicyStr.substring(paramStartPos, paramSeparatorPos), new TimeValue(5, TimeUnit.SECONDS), CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BACKOFFPOLICY );
+                        int maxNumberOfRetries = Integer.parseInt(backoffPolicyStr.substring(paramSeparatorPos+1, paramEndPos));
+                        bulkProcessorBuilder.setBackoffPolicy(BackoffPolicy.exponentialBackoff(delay, maxNumberOfRetries));
+                    }
+                }
+            }
+        }
+
+        bulkProcessor = bulkProcessorBuilder.build();
+        return bulkProcessor;
+    }
+
     public void stop() {
 
         new InClassLoaderExecute<Object>() {
             protected Object execute(Object... args) {
                 logger.info("Closing ElasticSearch persistence backend...");
+                if (bulkProcessor != null) {
+                    bulkProcessor.close();
+                }
                 node.close();
                 return null;
             }
@@ -605,9 +712,14 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                     String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) :
                             (itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndex(dateHint) : indexName);
 
-                    client.prepareUpdate(index, itemType, itemId).setDoc(source)
-                            .execute()
-                            .actionGet();
+                    if (bulkProcessor == null) {
+                        client.prepareUpdate(index, itemType, itemId).setDoc(source)
+                                .execute()
+                                .actionGet();
+                    } else {
+                        UpdateRequest updateRequest = client.prepareUpdate(index, itemType, itemId).setDoc(source).request();
+                        bulkProcessor.add(updateRequest);
+                    }
                     return true;
                 } catch (IndexNotFoundException e) {
                     logger.debug("No index found for itemType=" + clazz.getName() + "itemId=" + itemId, e);
@@ -632,9 +744,14 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
                             (itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndex(dateHint) : indexName);
 
                     Script actualScript = new Script(script, ScriptService.ScriptType.INLINE, null, scriptParams);
-                    client.prepareUpdate(index, itemType, itemId).setScript(actualScript)
-                            .execute()
-                            .actionGet();
+                    if (bulkProcessor == null) {
+                        client.prepareUpdate(index, itemType, itemId).setScript(actualScript)
+                                .execute()
+                                .actionGet();
+                    } else {
+                        UpdateRequest updateRequest = client.prepareUpdate(index, itemType, itemId).setScript(actualScript).request();
+                        bulkProcessor.add(updateRequest);
+                    }
                     return true;
                 } catch (IndexNotFoundException e) {
                     logger.debug("No index found for itemType=" + clazz.getName() + "itemId=" + itemId, e);

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/285d4cc6/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
----------------------------------------------------------------------
diff --git a/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
index 7263b28..06b83b0 100644
--- a/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
@@ -812,7 +812,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
             }
         }
 
-        logger.info("Profiles past condition updated in {}", System.currentTimeMillis()-t);
+        logger.info("Profiles past condition updated in {}ms", System.currentTimeMillis()-t);
     }
 
     private void updateExistingProfilesForSegment(Segment segment) {
@@ -854,7 +854,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
                 persistenceService.update(profileToRemove.getItemId(), null, Profile.class, "segments", profileToRemove.getSegments());
             }
         }
-        logger.info("Profiles updated in {}", System.currentTimeMillis()-t);
+        logger.info("Profiles updated in {}ms", System.currentTimeMillis()-t);
     }
 
     private void updateExistingProfilesForScoring(Scoring scoring) {
@@ -888,7 +888,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
                 eventService.send(entries.next().getValue());
             }
         }
-        logger.info("Profiles updated in {}", System.currentTimeMillis()-t);
+        logger.info("Profiles updated in {}ms", System.currentTimeMillis()-t);
     }
 
     private void updateExistingProfilesForRemovedScoring(String scoringId) {
@@ -905,7 +905,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
         for (Profile profileToRemove : previousProfiles) {
             persistenceService.updateWithScript(profileToRemove.getItemId(), null, Profile.class, "ctx._source.scores.remove(scoringId)", scriptParams);
         }
-        logger.info("Profiles updated in {}", System.currentTimeMillis()-t);
+        logger.info("Profiles updated in {}ms", System.currentTimeMillis()-t);
     }
 
     private String getMD5(String md5) {