You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@unomi.apache.org by sh...@apache.org on 2016/11/14 16:26:00 UTC
[38/38] incubator-unomi git commit: UNOMI-63 Use ElasticSearch
BulkProcessing to perform segment updates - Fix merge issues with master -
Implement bulk processor support for ElasticSearch update operations
(indexing operations not yet integrated because
UNOMI-63 Use ElasticSearch BulkProcessing to perform segment updates
- Fix merge issues with master
- Implement bulk processor support for ElasticSearch update operations (indexing operations not yet integrated because of IndexNotFoundException handling)
- Minor logging improvements in SegmentService
Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/285d4cc6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/285d4cc6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/285d4cc6
Branch: refs/heads/UNOMI-28-ES-2-X-UPGRADE
Commit: 285d4cc6dbcce938f094188fa3809f93fb9dc52b
Parents: f0bd45f
Author: Serge Huber <sh...@apache.org>
Authored: Mon Nov 14 17:25:05 2016 +0100
Committer: Serge Huber <sh...@apache.org>
Committed: Mon Nov 14 17:25:05 2016 +0100
----------------------------------------------------------------------
package/pom.xml | 5 +-
.../ElasticSearchPersistenceServiceImpl.java | 133 +++++++++++++++++--
.../services/services/SegmentServiceImpl.java | 8 +-
3 files changed, 130 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/285d4cc6/package/pom.xml
----------------------------------------------------------------------
diff --git a/package/pom.xml b/package/pom.xml
index 5de850b..d83f4c3 100644
--- a/package/pom.xml
+++ b/package/pom.xml
@@ -123,9 +123,6 @@
<profiles>
<profile>
<id>src</id>
- <activation>
- <activeByDefault>true</activeByDefault>
- </activation>
<build>
<plugins>
<plugin>
@@ -308,7 +305,7 @@
</plugin>
</plugins>
</build>
+ </profile>
</profiles>
-
</project>
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/285d4cc6/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
----------------------------------------------------------------------
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 16f1bc6..d13f7b6 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -40,8 +40,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.deletebyquery.DeleteByQueryAction;
import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
@@ -51,12 +50,15 @@ import org.elasticsearch.action.percolate.PercolateResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
+import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
@@ -101,6 +103,7 @@ import java.nio.file.Paths;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
+import java.util.concurrent.TimeUnit;
import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
@@ -114,6 +117,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
public static final String CONTEXTSERVER_PORT = "contextserver.port";
public static final String CONTEXTSERVER_SECURE_ADDRESS = "contextserver.secureAddress";
public static final String CONTEXTSERVER_SECURE_PORT = "contextserver.securePort";
+ public static final String CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_NAME = "contextserver.elasticsearch.bulkprocessor.name";
+ public static final String CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_CONCURRENTREQUEST = "contextserver.elasticsearch.bulkprocessor.concurrentRequests";
+ public static final String CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BULKACTIONS = "contextserver.elasticsearch.bulkprocessor.bulkActions";
+ public static final String CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BULKSIZE = "contextserver.elasticsearch.bulkprocessor.bulkSize";
+ public static final String CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_FLUSHINTERVAL = "contextserver.elasticsearch.bulkprocessor.flushInterval";
+ public static final String CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BACKOFFPOLICY = "contextserver.elasticsearch.bulkprocessor.backoffPolicy";
public static final String KARAF_HOME = "karaf.home";
public static final String ELASTICSEARCH_HOME_DIRECTORY = "elasticsearch";
public static final String ELASTICSEARCH_PLUGINS_DIRECTORY = ELASTICSEARCH_HOME_DIRECTORY + "/plugins";
@@ -136,6 +145,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
private Node node;
private Client client;
+ private BulkProcessor bulkProcessor;
private String clusterName;
private String indexName;
private String monthlyIndexNumberOfShards;
@@ -351,6 +361,10 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
getMonthlyIndex(new Date(), true);
+ if (client != null && bulkProcessor == null) {
+ bulkProcessor = getBulkProcessor();
+ }
+
return null;
}
}.executeInClassLoader();
@@ -388,11 +402,104 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
logger.info(this.getClass().getName() + " service started successfully.");
}
+ public BulkProcessor getBulkProcessor() {
+ if (bulkProcessor != null) {
+ return bulkProcessor;
+ }
+ BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(
+ client,
+ new BulkProcessor.Listener() {
+ @Override
+ public void beforeBulk(long executionId,
+ BulkRequest request) {
+ logger.debug("Before Bulk");
+ }
+
+ @Override
+ public void afterBulk(long executionId,
+ BulkRequest request,
+ BulkResponse response) {
+ logger.debug("After Bulk");
+ }
+
+ @Override
+ public void afterBulk(long executionId,
+ BulkRequest request,
+ Throwable failure) {
+ logger.error("After Bulk (failure)", failure);
+ // we could add index creation here in the case of index seperation by dates.
+ }
+ });
+ if (System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_NAME) != null) {
+ String name = System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_NAME);
+ if (name != null && name.length() > 0) {
+ bulkProcessorBuilder.setName(name);
+ }
+ }
+ if (System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_CONCURRENTREQUEST) != null) {
+ String concurrentRequestsStr = System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_CONCURRENTREQUEST);
+ int concurrentRequests = Integer.parseInt(concurrentRequestsStr);
+ if (concurrentRequests > 1) {
+ bulkProcessorBuilder.setConcurrentRequests(concurrentRequests);
+ }
+ }
+ if (System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BULKACTIONS) != null) {
+ String bulkActionsStr = System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BULKACTIONS);
+ int bulkActions = Integer.parseInt(bulkActionsStr);
+ bulkProcessorBuilder.setBulkActions(bulkActions);
+ }
+ if (System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BULKSIZE) != null) {
+ String bulkSizeStr = System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BULKSIZE);
+ bulkProcessorBuilder.setBulkSize(ByteSizeValue.parseBytesSizeValue(bulkSizeStr, new ByteSizeValue(5, ByteSizeUnit.MB), CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BULKSIZE));
+ }
+ if (System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_FLUSHINTERVAL) != null) {
+ String flushIntervalStr = System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_FLUSHINTERVAL);
+ bulkProcessorBuilder.setFlushInterval(TimeValue.parseTimeValue(flushIntervalStr, null, CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_FLUSHINTERVAL));
+ } else {
+ // in ElasticSearch this defaults to null, but we would like to set a value to 5 seconds by default
+ bulkProcessorBuilder.setFlushInterval(new TimeValue(5, TimeUnit.SECONDS));
+ }
+ if (System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BACKOFFPOLICY) != null) {
+ String backoffPolicyStr = System.getProperty(CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BACKOFFPOLICY);
+ if (backoffPolicyStr != null && backoffPolicyStr.length() > 0) {
+ backoffPolicyStr = backoffPolicyStr.toLowerCase();
+ if ("nobackoff".equals(backoffPolicyStr)) {
+ bulkProcessorBuilder.setBackoffPolicy(BackoffPolicy.noBackoff());
+ } else if (backoffPolicyStr.startsWith("constant(")) {
+ int paramStartPos = backoffPolicyStr.indexOf("constant(" + "constant(".length());
+ int paramEndPos = backoffPolicyStr.indexOf(")", paramStartPos);
+ int paramSeparatorPos = backoffPolicyStr.indexOf(",", paramStartPos);
+ TimeValue delay = TimeValue.parseTimeValue(backoffPolicyStr.substring(paramStartPos, paramSeparatorPos), new TimeValue(5, TimeUnit.SECONDS), CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BACKOFFPOLICY );
+ int maxNumberOfRetries = Integer.parseInt(backoffPolicyStr.substring(paramSeparatorPos+1, paramEndPos));
+ bulkProcessorBuilder.setBackoffPolicy(BackoffPolicy.constantBackoff(delay, maxNumberOfRetries));
+ } else if (backoffPolicyStr.startsWith("exponential")) {
+ if (!backoffPolicyStr.contains("(")) {
+ bulkProcessorBuilder.setBackoffPolicy(BackoffPolicy.exponentialBackoff());
+ } else {
+ // we detected parameters, must process them.
+ int paramStartPos = backoffPolicyStr.indexOf("exponential(" + "exponential(".length());
+ int paramEndPos = backoffPolicyStr.indexOf(")", paramStartPos);
+ int paramSeparatorPos = backoffPolicyStr.indexOf(",", paramStartPos);
+ TimeValue delay = TimeValue.parseTimeValue(backoffPolicyStr.substring(paramStartPos, paramSeparatorPos), new TimeValue(5, TimeUnit.SECONDS), CONTEXTSERVER_ELASTICSEARCH_BULKPROCESSOR_BACKOFFPOLICY );
+ int maxNumberOfRetries = Integer.parseInt(backoffPolicyStr.substring(paramSeparatorPos+1, paramEndPos));
+ bulkProcessorBuilder.setBackoffPolicy(BackoffPolicy.exponentialBackoff(delay, maxNumberOfRetries));
+ }
+ }
+ }
+ }
+
+ bulkProcessor = bulkProcessorBuilder.build();
+ return bulkProcessor;
+ }
+
public void stop() {
new InClassLoaderExecute<Object>() {
protected Object execute(Object... args) {
logger.info("Closing ElasticSearch persistence backend...");
+ if (bulkProcessor != null) {
+ bulkProcessor.close();
+ }
node.close();
return null;
}
@@ -605,9 +712,14 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) :
(itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndex(dateHint) : indexName);
- client.prepareUpdate(index, itemType, itemId).setDoc(source)
- .execute()
- .actionGet();
+ if (bulkProcessor == null) {
+ client.prepareUpdate(index, itemType, itemId).setDoc(source)
+ .execute()
+ .actionGet();
+ } else {
+ UpdateRequest updateRequest = client.prepareUpdate(index, itemType, itemId).setDoc(source).request();
+ bulkProcessor.add(updateRequest);
+ }
return true;
} catch (IndexNotFoundException e) {
logger.debug("No index found for itemType=" + clazz.getName() + "itemId=" + itemId, e);
@@ -632,9 +744,14 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService,
(itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndex(dateHint) : indexName);
Script actualScript = new Script(script, ScriptService.ScriptType.INLINE, null, scriptParams);
- client.prepareUpdate(index, itemType, itemId).setScript(actualScript)
- .execute()
- .actionGet();
+ if (bulkProcessor == null) {
+ client.prepareUpdate(index, itemType, itemId).setScript(actualScript)
+ .execute()
+ .actionGet();
+ } else {
+ UpdateRequest updateRequest = client.prepareUpdate(index, itemType, itemId).setScript(actualScript).request();
+ bulkProcessor.add(updateRequest);
+ }
return true;
} catch (IndexNotFoundException e) {
logger.debug("No index found for itemType=" + clazz.getName() + "itemId=" + itemId, e);
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/285d4cc6/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
----------------------------------------------------------------------
diff --git a/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
index 7263b28..06b83b0 100644
--- a/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
+++ b/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java
@@ -812,7 +812,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
}
}
- logger.info("Profiles past condition updated in {}", System.currentTimeMillis()-t);
+ logger.info("Profiles past condition updated in {}ms", System.currentTimeMillis()-t);
}
private void updateExistingProfilesForSegment(Segment segment) {
@@ -854,7 +854,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
persistenceService.update(profileToRemove.getItemId(), null, Profile.class, "segments", profileToRemove.getSegments());
}
}
- logger.info("Profiles updated in {}", System.currentTimeMillis()-t);
+ logger.info("Profiles updated in {}ms", System.currentTimeMillis()-t);
}
private void updateExistingProfilesForScoring(Scoring scoring) {
@@ -888,7 +888,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
eventService.send(entries.next().getValue());
}
}
- logger.info("Profiles updated in {}", System.currentTimeMillis()-t);
+ logger.info("Profiles updated in {}ms", System.currentTimeMillis()-t);
}
private void updateExistingProfilesForRemovedScoring(String scoringId) {
@@ -905,7 +905,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList
for (Profile profileToRemove : previousProfiles) {
persistenceService.updateWithScript(profileToRemove.getItemId(), null, Profile.class, "ctx._source.scores.remove(scoringId)", scriptParams);
}
- logger.info("Profiles updated in {}", System.currentTimeMillis()-t);
+ logger.info("Profiles updated in {}ms", System.currentTimeMillis()-t);
}
private String getMD5(String md5) {