You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2015/07/29 21:21:44 UTC

samza git commit: SAMZA-741: Add support for versioning to Elasticsearch System Producer

Repository: samza
Updated Branches:
  refs/heads/master b75d55bff -> bf094c8e5


SAMZA-741: Add support for versioning to Elasticsearch System Producer


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/bf094c8e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/bf094c8e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/bf094c8e

Branch: refs/heads/master
Commit: bf094c8e5c4038b158811f51e8ca523bb7c1b182
Parents: b75d55b
Author: Roger Hoover <ro...@gmail.com>
Authored: Wed Jul 29 11:49:20 2015 -0700
Committer: Yi Pan (Data Infrastructure) <yi...@linkedin.com>
Committed: Wed Jul 29 12:21:14 2015 -0700

----------------------------------------------------------------------
 .../ElasticsearchSystemProducer.java            | 58 +++++++++++++------
 .../ElasticsearchSystemProducerMetrics.java     |  2 +
 .../DefaultIndexRequestFactory.java             | 60 +++++++++++++++++---
 .../ElasticsearchSystemProducerMetricsTest.java |  4 +-
 .../ElasticsearchSystemProducerTest.java        | 44 ++++++++++++--
 5 files changed, 140 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/bf094c8e/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
index f61bd36..7672ee8 100644
--- a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
@@ -31,6 +31,7 @@ import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.rest.RestStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -105,20 +106,59 @@ public class ElasticsearchSystemProducer implements SystemProducer {
 
         @Override
         public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
+          boolean hasFatalError = false;
+          //Do not consider version conficts to be errors. Ignore old versions
           if (response.hasFailures()) {
+            for (BulkItemResponse itemResp : response.getItems()) {
+              if (itemResp.isFailed()) {
+                if (itemResp.getFailure().getStatus().equals(RestStatus.CONFLICT)) {
+                  LOGGER.info("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage());
+                } else {
+                  hasFatalError = true;
+                  LOGGER.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage());
+                }
+              }
+            }
+          }
+          if (hasFatalError) {
             sendFailed.set(true);
           } else {
             updateSuccessMetrics(response);
-            LOGGER.info(String.format("Written %s messages from %s to %s.",
-                                      response.getItems().length, source, system));
           }
         }
 
         @Override
         public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
+          LOGGER.error(failure.getMessage());
           thrown.compareAndSet(null, failure);
           sendFailed.set(true);
         }
+
+        private void updateSuccessMetrics(BulkResponse response) {
+          metrics.bulkSendSuccess.inc();
+          int writes = 0;
+          for (BulkItemResponse itemResp: response.getItems()) {
+            if (itemResp.isFailed()) {
+              if (itemResp.getFailure().getStatus().equals(RestStatus.CONFLICT)) {
+                metrics.conflicts.inc();
+              }
+            } else {
+              ActionResponse resp = itemResp.getResponse();
+              if (resp instanceof IndexResponse) {
+                writes += 1;
+                if (((IndexResponse) resp).isCreated()) {
+                  metrics.inserts.inc();
+                } else {
+                  metrics.updates.inc();
+                }
+              } else {
+                LOGGER.error("Unexpected Elasticsearch action response type: " + resp.getClass().getSimpleName());
+              }
+            }
+          }
+          LOGGER.info(String.format("Wrote %s messages from %s to %s.",
+                  writes, source, system));
+        }
     };
 
     sourceBulkProcessor.put(source, bulkProcessorFactory.getBulkProcessor(client, listener));
@@ -150,18 +190,4 @@ public class ElasticsearchSystemProducer implements SystemProducer {
     LOGGER.info(String.format("Flushed %s to %s.", source, system));
   }
 
-  private void updateSuccessMetrics(BulkResponse response) {
-    metrics.bulkSendSuccess.inc();
-    for (BulkItemResponse itemResp: response.getItems()) {
-        ActionResponse resp = itemResp.getResponse();
-        if (resp instanceof IndexResponse) {
-          if (((IndexResponse)resp).isCreated()) {
-            metrics.inserts.inc();
-          }
-          else {
-            metrics.updates.inc();
-          }
-        }
-      }
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/bf094c8e/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
index e3b635b..5a46cba 100644
--- a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetrics.java
@@ -26,6 +26,7 @@ public class ElasticsearchSystemProducerMetrics extends MetricsBase {
     public final Counter bulkSendSuccess;
     public final Counter inserts;
     public final Counter updates;
+    public final Counter conflicts;
 
     public ElasticsearchSystemProducerMetrics(String systemName, MetricsRegistry registry) {
         super(systemName + "-", registry);
@@ -33,5 +34,6 @@ public class ElasticsearchSystemProducerMetrics extends MetricsBase {
         bulkSendSuccess = newCounter("bulk-send-success");
         inserts = newCounter("docs-inserted");
         updates = newCounter("docs-updated");
+        conflicts = newCounter("version-conflicts");
     }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/bf094c8e/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
index afe0eee..ddac22f 100644
--- a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
@@ -22,6 +22,9 @@ package org.apache.samza.system.elasticsearch.indexrequest;
 import org.apache.samza.SamzaException;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.common.base.Optional;
+import org.elasticsearch.index.VersionType;
 
 import java.util.Map;
 
@@ -53,24 +56,68 @@ public class DefaultIndexRequestFactory implements IndexRequestFactory {
 
   @Override
   public IndexRequest getIndexRequest(OutgoingMessageEnvelope envelope) {
+    IndexRequest indexRequest = getRequest(envelope);
+
+    Optional<String> id = getId(envelope);
+    if (id.isPresent()) {
+      indexRequest.id(id.get());
+    }
+
+    Optional<String> routingKey = getRoutingKey(envelope);
+    if (routingKey.isPresent()) {
+      indexRequest.routing(routingKey.get());
+    }
+
+    Optional<Long> version = getVersion(envelope);
+    if (version.isPresent()) {
+      indexRequest.version(version.get());
+    }
+
+    Optional<VersionType> versionType = getVersionType(envelope);
+    if (versionType.isPresent()) {
+      indexRequest.versionType(versionType.get());
+    }
+
+    setSource(envelope, indexRequest);
+
+    return indexRequest;
+  }
+
+  protected IndexRequest getRequest(OutgoingMessageEnvelope envelope) {
     String[] parts = envelope.getSystemStream().getStream().split("/");
     if (parts.length != 2) {
       throw new SamzaException("Elasticsearch stream name must match pattern {index}/{type}");
     }
     String index = parts[0];
     String type = parts[1];
-    IndexRequest indexRequest = new IndexRequest(index, type);
+    return Requests.indexRequest(index).type(type);
+  }
 
+  protected Optional<String> getId(OutgoingMessageEnvelope envelope) {
     Object id = envelope.getKey();
-    if (id != null) {
-      indexRequest.id(id.toString());
+    if (id == null) {
+      return Optional.absent();
     }
+    return Optional.of(id.toString());
+  }
 
+  protected Optional<String> getRoutingKey(OutgoingMessageEnvelope envelope) {
     Object partitionKey = envelope.getPartitionKey();
-    if (partitionKey != null) {
-      indexRequest.routing(partitionKey.toString());
+    if (partitionKey == null) {
+      return Optional.absent();
     }
+    return Optional.of(partitionKey.toString());
+  }
+
+  protected Optional<Long> getVersion(OutgoingMessageEnvelope envelope) {
+    return Optional.absent();
+  }
 
+  protected Optional<VersionType> getVersionType(OutgoingMessageEnvelope envelope) {
+    return Optional.absent();
+  }
+
+  protected void setSource(OutgoingMessageEnvelope envelope, IndexRequest indexRequest) {
     Object message = envelope.getMessage();
     if (message instanceof byte[]) {
       indexRequest.source((byte[]) message);
@@ -79,7 +126,6 @@ public class DefaultIndexRequestFactory implements IndexRequestFactory {
     } else {
       throw new SamzaException("Unsupported message type: " + message.getClass().getCanonicalName());
     }
-
-    return indexRequest;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/bf094c8e/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java b/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
index 980964f..5554705 100644
--- a/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
+++ b/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerMetricsTest.java
@@ -38,16 +38,18 @@ public class ElasticsearchSystemProducerMetricsTest {
         metrics.bulkSendSuccess.inc(29L);
         metrics.inserts.inc();
         metrics.updates.inc(7L);
+        metrics.conflicts.inc(3L);
 
         Set<String> groups = registry.getGroups();
         assertEquals(1, groups.size());
         assertEquals(GRP_NAME, groups.toArray()[0]);
 
         Map<String, Metric> metricMap = registry.getGroup(GRP_NAME);
-        assertEquals(3, metricMap.size());
+        assertEquals(4, metricMap.size());
         assertEquals(29L, ((Counter) metricMap.get("es-bulk-send-success")).getCount());
         assertEquals(1L, ((Counter) metricMap.get("es-docs-inserted")).getCount());
         assertEquals(7L, ((Counter) metricMap.get("es-docs-updated")).getCount());
+        assertEquals(3L, ((Counter) metricMap.get("es-version-conflicts")).getCount());
     }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/bf094c8e/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java b/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
index 684d7f6..992ef0a 100644
--- a/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
+++ b/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
@@ -24,14 +24,17 @@ import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.elasticsearch.indexrequest.IndexRequestFactory;
+import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.rest.RestStatus;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
+import static org.junit.Assert.*;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
@@ -44,20 +47,21 @@ public class ElasticsearchSystemProducerTest {
   private static final BulkProcessorFactory BULK_PROCESSOR_FACTORY = mock(BulkProcessorFactory.class);
   private static final Client CLIENT = mock(Client.class);
   private static final IndexRequestFactory INDEX_REQUEST_FACTORY = mock(IndexRequestFactory.class);
-  private static final ElasticsearchSystemProducerMetrics METRICS = new ElasticsearchSystemProducerMetrics("es", new MetricsRegistryMap());
   public static final String SOURCE_ONE = "one";
   public static final String SOURCE_TWO = "two";
   private SystemProducer producer;
   public static BulkProcessor processorOne;
   public static BulkProcessor processorTwo;
+  private ElasticsearchSystemProducerMetrics metrics;
 
   @Before
   public void setUp() throws Exception {
+    metrics = new ElasticsearchSystemProducerMetrics("es", new MetricsRegistryMap());
     producer = new ElasticsearchSystemProducer(SYSTEM_NAME,
                                                BULK_PROCESSOR_FACTORY,
                                                CLIENT,
                                                INDEX_REQUEST_FACTORY,
-                                               METRICS);
+                                               metrics);
 
     processorOne = mock(BulkProcessor.class);
     processorTwo = mock(BulkProcessor.class);
@@ -127,11 +131,43 @@ public class ElasticsearchSystemProducerTest {
         .thenReturn(processorOne);
     producer.register(SOURCE_ONE);
 
-    BulkResponse response = mock(BulkResponse.class);
-    when(response.hasFailures()).thenReturn(true);
+    BulkResponse response = getRespWithFailedDocument(RestStatus.BAD_REQUEST);
 
     listenerCaptor.getValue().afterBulk(0, null, response);
 
     producer.flush(SOURCE_ONE);
   }
+
+  @Test
+  public void testIgnoreVersionConficts() throws Exception {
+    ArgumentCaptor<BulkProcessor.Listener> listenerCaptor =
+            ArgumentCaptor.forClass(BulkProcessor.Listener.class);
+
+    when(BULK_PROCESSOR_FACTORY.getBulkProcessor(eq(CLIENT), listenerCaptor.capture()))
+            .thenReturn(processorOne);
+    producer.register(SOURCE_ONE);
+
+    BulkResponse response = getRespWithFailedDocument(RestStatus.CONFLICT);
+
+    listenerCaptor.getValue().afterBulk(0, null, response);
+    assertEquals(1, metrics.conflicts.getCount());
+
+    producer.flush(SOURCE_ONE);
+  }
+
+  private BulkResponse getRespWithFailedDocument(RestStatus status) {
+    BulkResponse response = mock(BulkResponse.class);
+    when(response.hasFailures()).thenReturn(true);
+
+    BulkItemResponse itemResp = mock(BulkItemResponse.class);
+    when(itemResp.isFailed()).thenReturn(true);
+    BulkItemResponse.Failure failure = mock(BulkItemResponse.Failure.class);
+    when(failure.getStatus()).thenReturn(status);
+    when(itemResp.getFailure()).thenReturn(failure);
+    BulkItemResponse[] itemResponses = new BulkItemResponse[]{itemResp};
+
+    when(response.getItems()).thenReturn(itemResponses);
+
+    return response;
+  }
 }