You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2015/07/29 21:21:44 UTC
samza git commit: SAMZA-741: Add support for versioning to
Elasticsearch System Producer
Repository: samza
Updated Branches:
refs/heads/master b75d55bff -> bf094c8e5
SAMZA-741: Add support for versioning to Elasticsearch System Producer
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/bf094c8e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/bf094c8e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/bf094c8e
Branch: refs/heads/master
Commit: bf094c8e5c4038b158811f51e8ca523bb7c1b182
Parents: b75d55b
Author: Roger Hoover <ro...@gmail.com>
Authored: Wed Jul 29 11:49:20 2015 -0700
Committer: Yi Pan (Data Infrastructure) <yi...@linkedin.com>
Committed: Wed Jul 29 12:21:14 2015 -0700
----------------------------------------------------------------------
.../ElasticsearchSystemProducer.java | 58 +++++++++++++------
.../ElasticsearchSystemProducerMetrics.java | 2 +
.../DefaultIndexRequestFactory.java | 60 +++++++++++++++++---
.../ElasticsearchSystemProducerMetricsTest.java | 4 +-
.../ElasticsearchSystemProducerTest.java | 44 ++++++++++++--
5 files changed, 140 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/bf094c8e/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
index f61bd36..7672ee8 100644
--- a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
@@ -31,6 +31,7 @@ import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
+import org.elasticsearch.rest.RestStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -105,20 +106,59 @@ public class ElasticsearchSystemProducer implements SystemProducer {
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
+ boolean hasFatalError = false;
+ //Do not consider version conficts to be errors. Ignore old versions
if (response.hasFailures()) {
+ for (BulkItemResponse itemResp : response.getItems()) {
+ if (itemResp.isFailed()) {
+ if (itemResp.getFailure().getStatus().equals(RestStatus.CONFLICT)) {
+ LOGGER.info("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage());
+ } else {
+ hasFatalError = true;
+ LOGGER.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage());
+ }
+ }
+ }
+ }
+ if (hasFatalError) {
sendFailed.set(true);
} else {
updateSuccessMetrics(response);
- LOGGER.info(String.format("Written %s messages from %s to %s.",
- response.getItems().length, source, system));
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
+ LOGGER.error(failure.getMessage());
thrown.compareAndSet(null, failure);
sendFailed.set(true);
}
+
+ private void updateSuccessMetrics(BulkResponse response) {
+ metrics.bulkSendSuccess.inc();
+ int writes = 0;
+ for (BulkItemResponse itemResp: response.getItems()) {
+ if (itemResp.isFailed()) {
+ if (itemResp.getFailure().getStatus().equals(RestStatus.CONFLICT)) {
+ metrics.conflicts.inc();
+ }
+ } else {
+ ActionResponse resp = itemResp.getResponse();
+ if (resp instanceof IndexResponse) {
+ writes += 1;
+ if (((IndexResponse) resp).isCreated()) {
+ metrics.inserts.inc();
+ } else {
+ metrics.updates.inc();
+ }
+ } else {
+ LOGGER.error("Unexpected Elasticsearch action response type: " + resp.getClass().getSimpleName());
+ }
+ }
+ }
+ LOGGER.info(String.format("Wrote %s messages from %s to %s.",
+ writes, source, system));
+ }
};
sourceBulkProcessor.put(source, bulkProcessorFactory.getBulkProcessor(client, listener));
@@ -150,18 +190,4 @@ public class ElasticsearchSystemProducer implements SystemProducer {
LOGGER.info(String.format("Flushed %s to %s.", source, system));
}
- private void updateSuccessMetrics(BulkResponse response) {
- metrics.bulkSendSuccess.inc();
- for (BulkItemResponse itemResp: response.getItems()) {
- ActionResponse resp = itemResp.getResponse();
- if (resp instanceof IndexResponse) {
- if (((IndexResponse)resp).isCreated()) {
- metrics.inserts.inc();
- }
- else {
- metrics.updates.inc();
- }
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/bf094c8e/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
index e3b635b..5a46cba 100644
--- a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
@@ -26,6 +26,7 @@ public class ElasticsearchSystemProducerMetrics extends MetricsBase {
public final Counter bulkSendSuccess;
public final Counter inserts;
public final Counter updates;
+ public final Counter conflicts;
public ElasticsearchSystemProducerMetrics(String systemName, MetricsRegistry registry) {
super(systemName + "-", registry);
@@ -33,5 +34,6 @@ public class ElasticsearchSystemProducerMetrics extends MetricsBase {
bulkSendSuccess = newCounter("bulk-send-success");
inserts = newCounter("docs-inserted");
updates = newCounter("docs-updated");
+ conflicts = newCounter("version-conflicts");
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/bf094c8e/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
index afe0eee..ddac22f 100644
--- a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
@@ -22,6 +22,9 @@ package org.apache.samza.system.elasticsearch.indexrequest;
import org.apache.samza.SamzaException;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.common.base.Optional;
+import org.elasticsearch.index.VersionType;
import java.util.Map;
@@ -53,24 +56,68 @@ public class DefaultIndexRequestFactory implements IndexRequestFactory {
@Override
public IndexRequest getIndexRequest(OutgoingMessageEnvelope envelope) {
+ IndexRequest indexRequest = getRequest(envelope);
+
+ Optional<String> id = getId(envelope);
+ if (id.isPresent()) {
+ indexRequest.id(id.get());
+ }
+
+ Optional<String> routingKey = getRoutingKey(envelope);
+ if (routingKey.isPresent()) {
+ indexRequest.routing(routingKey.get());
+ }
+
+ Optional<Long> version = getVersion(envelope);
+ if (version.isPresent()) {
+ indexRequest.version(version.get());
+ }
+
+ Optional<VersionType> versionType = getVersionType(envelope);
+ if (versionType.isPresent()) {
+ indexRequest.versionType(versionType.get());
+ }
+
+ setSource(envelope, indexRequest);
+
+ return indexRequest;
+ }
+
+ protected IndexRequest getRequest(OutgoingMessageEnvelope envelope) {
String[] parts = envelope.getSystemStream().getStream().split("/");
if (parts.length != 2) {
throw new SamzaException("Elasticsearch stream name must match pattern {index}/{type}");
}
String index = parts[0];
String type = parts[1];
- IndexRequest indexRequest = new IndexRequest(index, type);
+ return Requests.indexRequest(index).type(type);
+ }
+ protected Optional<String> getId(OutgoingMessageEnvelope envelope) {
Object id = envelope.getKey();
- if (id != null) {
- indexRequest.id(id.toString());
+ if (id == null) {
+ return Optional.absent();
}
+ return Optional.of(id.toString());
+ }
+ protected Optional<String> getRoutingKey(OutgoingMessageEnvelope envelope) {
Object partitionKey = envelope.getPartitionKey();
- if (partitionKey != null) {
- indexRequest.routing(partitionKey.toString());
+ if (partitionKey == null) {
+ return Optional.absent();
}
+ return Optional.of(partitionKey.toString());
+ }
+
+ protected Optional<Long> getVersion(OutgoingMessageEnvelope envelope) {
+ return Optional.absent();
+ }
+ protected Optional<VersionType> getVersionType(OutgoingMessageEnvelope envelope) {
+ return Optional.absent();
+ }
+
+ protected void setSource(OutgoingMessageEnvelope envelope, IndexRequest indexRequest) {
Object message = envelope.getMessage();
if (message instanceof byte[]) {
indexRequest.source((byte[]) message);
@@ -79,7 +126,6 @@ public class DefaultIndexRequestFactory implements IndexRequestFactory {
} else {
throw new SamzaException("Unsupported message type: " + message.getClass().getCanonicalName());
}
-
- return indexRequest;
}
+
}
http://git-wip-us.apache.org/repos/asf/samza/blob/bf094c8e/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java b/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
index 980964f..5554705 100644
--- a/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
+++ b/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
@@ -38,16 +38,18 @@ public class ElasticsearchSystemProducerMetricsTest {
metrics.bulkSendSuccess.inc(29L);
metrics.inserts.inc();
metrics.updates.inc(7L);
+ metrics.conflicts.inc(3L);
Set<String> groups = registry.getGroups();
assertEquals(1, groups.size());
assertEquals(GRP_NAME, groups.toArray()[0]);
Map<String, Metric> metricMap = registry.getGroup(GRP_NAME);
- assertEquals(3, metricMap.size());
+ assertEquals(4, metricMap.size());
assertEquals(29L, ((Counter) metricMap.get("es-bulk-send-success")).getCount());
assertEquals(1L, ((Counter) metricMap.get("es-docs-inserted")).getCount());
assertEquals(7L, ((Counter) metricMap.get("es-docs-updated")).getCount());
+ assertEquals(3L, ((Counter) metricMap.get("es-version-conflicts")).getCount());
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/bf094c8e/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java b/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
index 684d7f6..992ef0a 100644
--- a/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
+++ b/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
@@ -24,14 +24,17 @@ import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.elasticsearch.indexrequest.IndexRequestFactory;
+import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
+import org.elasticsearch.rest.RestStatus;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
+import static org.junit.Assert.*;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
@@ -44,20 +47,21 @@ public class ElasticsearchSystemProducerTest {
private static final BulkProcessorFactory BULK_PROCESSOR_FACTORY = mock(BulkProcessorFactory.class);
private static final Client CLIENT = mock(Client.class);
private static final IndexRequestFactory INDEX_REQUEST_FACTORY = mock(IndexRequestFactory.class);
- private static final ElasticsearchSystemProducerMetrics METRICS = new ElasticsearchSystemProducerMetrics("es", new MetricsRegistryMap());
public static final String SOURCE_ONE = "one";
public static final String SOURCE_TWO = "two";
private SystemProducer producer;
public static BulkProcessor processorOne;
public static BulkProcessor processorTwo;
+ private ElasticsearchSystemProducerMetrics metrics;
@Before
public void setUp() throws Exception {
+ metrics = new ElasticsearchSystemProducerMetrics("es", new MetricsRegistryMap());
producer = new ElasticsearchSystemProducer(SYSTEM_NAME,
BULK_PROCESSOR_FACTORY,
CLIENT,
INDEX_REQUEST_FACTORY,
- METRICS);
+ metrics);
processorOne = mock(BulkProcessor.class);
processorTwo = mock(BulkProcessor.class);
@@ -127,11 +131,43 @@ public class ElasticsearchSystemProducerTest {
.thenReturn(processorOne);
producer.register(SOURCE_ONE);
- BulkResponse response = mock(BulkResponse.class);
- when(response.hasFailures()).thenReturn(true);
+ BulkResponse response = getRespWithFailedDocument(RestStatus.BAD_REQUEST);
listenerCaptor.getValue().afterBulk(0, null, response);
producer.flush(SOURCE_ONE);
}
+
+ @Test
+ public void testIgnoreVersionConficts() throws Exception {
+ ArgumentCaptor<BulkProcessor.Listener> listenerCaptor =
+ ArgumentCaptor.forClass(BulkProcessor.Listener.class);
+
+ when(BULK_PROCESSOR_FACTORY.getBulkProcessor(eq(CLIENT), listenerCaptor.capture()))
+ .thenReturn(processorOne);
+ producer.register(SOURCE_ONE);
+
+ BulkResponse response = getRespWithFailedDocument(RestStatus.CONFLICT);
+
+ listenerCaptor.getValue().afterBulk(0, null, response);
+ assertEquals(1, metrics.conflicts.getCount());
+
+ producer.flush(SOURCE_ONE);
+ }
+
+ private BulkResponse getRespWithFailedDocument(RestStatus status) {
+ BulkResponse response = mock(BulkResponse.class);
+ when(response.hasFailures()).thenReturn(true);
+
+ BulkItemResponse itemResp = mock(BulkItemResponse.class);
+ when(itemResp.isFailed()).thenReturn(true);
+ BulkItemResponse.Failure failure = mock(BulkItemResponse.Failure.class);
+ when(failure.getStatus()).thenReturn(status);
+ when(itemResp.getFailure()).thenReturn(failure);
+ BulkItemResponse[] itemResponses = new BulkItemResponse[]{itemResp};
+
+ when(response.getItems()).thenReturn(itemResponses);
+
+ return response;
+ }
}