You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ya...@apache.org on 2015/07/10 23:19:00 UTC

samza git commit: SAMZA-654: added Elasticsearch Producer

Repository: samza
Updated Branches:
  refs/heads/master 99dbca19d -> fa6ca0b14


SAMZA-654: added Elasticsearch Producer


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/fa6ca0b1
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/fa6ca0b1
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/fa6ca0b1

Branch: refs/heads/master
Commit: fa6ca0b1466099a289a04b5fd5674fffbcca5ca3
Parents: 99dbca1
Author: Dan Harvey <da...@gmail.com>
Authored: Fri Jul 10 14:18:36 2015 -0700
Committer: Yan Fang <ya...@gmail.com>
Committed: Fri Jul 10 14:18:36 2015 -0700

----------------------------------------------------------------------
 build.gradle                                    |  16 ++
 .../versioned/jobs/configuration-table.html     | 107 ++++++++++++++
 gradle/dependency-versions.gradle               |   1 +
 .../samza/config/ElasticsearchConfig.java       | 132 +++++++++++++++++
 .../elasticsearch/BulkProcessorFactory.java     |  58 ++++++++
 .../elasticsearch/ElasticsearchSystemAdmin.java |  65 +++++++++
 .../ElasticsearchSystemFactory.java             |  91 ++++++++++++
 .../ElasticsearchSystemProducer.java            | 145 +++++++++++++++++++
 .../elasticsearch/client/ClientFactory.java     |  31 ++++
 .../elasticsearch/client/NodeClientFactory.java |  60 ++++++++
 .../client/TransportClientFactory.java          |  76 ++++++++++
 .../DefaultIndexRequestFactory.java             |  85 +++++++++++
 .../indexrequest/IndexRequestFactory.java       |  34 +++++
 .../samza/config/ElasticsearchConfigTest.java   | 117 +++++++++++++++
 .../ElasticsearchSystemProducerTest.java        | 134 +++++++++++++++++
 .../DefaultIndexRequestFactoryTest.java         | 118 +++++++++++++++
 settings.gradle                                 |  26 ++--
 17 files changed, 1287 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/fa6ca0b1/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index a5f5410..0852adc 100644
--- a/build.gradle
+++ b/build.gradle
@@ -157,6 +157,22 @@ project(":samza-core_$scalaVersion") {
   }
 }
 
+project(':samza-elasticsearch') {
+  apply plugin: 'java'
+
+  dependencies {
+    compile project(':samza-api')
+    compile project(":samza-core_$scalaVersion")
+    compile "org.elasticsearch:elasticsearch:$elasticsearchVersion"
+    compile "org.slf4j:slf4j-api:$slf4jVersion"
+    testCompile "junit:junit:$junitVersion"
+    testCompile "org.mockito:mockito-all:$mockitoVersion"
+
+    // Logging in tests is good.
+    testRuntime "org.slf4j:slf4j-simple:$slf4jVersion"
+  }
+}
+
 project(":samza-kafka_$scalaVersion") {
   apply plugin: 'scala'
 

http://git-wip-us.apache.org/repos/asf/samza/blob/fa6ca0b1/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 405e2ce..cd7ea8d 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -638,6 +638,113 @@
                 </tr>
 
                 <tr>
+                    <th colspan="3" class="section" id="elasticsearch">
+                        Using <a href="https://github.com/elastic/elasticsearch">Elasticsearch</a> for output streams<br>
+                        <span class="subtitle">
+                            (This section applies if you have set
+                            <a href="#systems-samza-factory" class="property">systems.*.samza.factory</a>
+                            <code>= org.apache.samza.system.elasticsearch.ElasticsearchSystemFactory</code>)
+                        </span>
+                    </th>
+                </tr>
+
+                <tr>
+                    <td class="property" id="systems-samza-client-factory-class">systems.<span class="system">system-name</span>.<br>client.factory</td>
+                    <td class="default"></td>
+                    <td class="description">
+                        <strong>Required:</strong> The elasticsearch client factory used for connecting
+                        to the Elasticsearch cluster. Samza ships with the following implementations:
+                        <dl>
+                            <dt><code>org.apache.samza.system.elasticsearch.client.TransportClientFactory</code></dt>
+                            <dd>Creates a TransportClient that connects to the cluster remotely without
+                                joining it. This requires the transport host and port properties to be set.</dd>
+                            <dt><code>org.apache.samza.system.elasticsearch.client.NodeClientFactory</code></dt>
+                            <dd>Creates a Node client that connects to the cluster by joining it. By default
+                            this uses <a href="http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-discovery-zen.html">zen discovery</a> to find the cluster but other methods can be configured.</dd>
+                        </dl>
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="systems-samza-index-request-factory-class">systems.<span class="system">system-name</span>.<br>index.request.factory</td>
+                    <td class="default">org.apache.samza.system</br>.elasticsearch.indexrequest.</br>DefaultIndexRequestFactory</td>
+                    <td class="description">
+                        The index request factory that converts the Samza OutgoingMessageEnvelope into the IndexRequest
+                        to be send to elasticsearch. The default IndexRequestFactory behaves as follows:
+                        <dl>
+                            <dt><code>Stream name</code></dt>
+                            <dd>The stream name is of the format {index-name}/{type-name} which
+                            map on to the elasticsearch index and type.</dd>
+                            <dt><code>Message id</code></dt>
+                            <dd>If the message has a key this is set as the document id, otherwise Elasticsearch will generate one for each document.</dd>
+                            <dt><code>Partition id</code></dt>
+                            <dd>If the partition key is set then this is used as the Elasticsearch routing key.</dd>
+                            <dt><code>Message</code></dt>
+                            <dd>The message must be either a byte[] which is passed directly on to Elasticsearch, or a Map which is passed on to the
+                            Elasticsearch client which serialises it into a JSON String. Samza serdes are not currently supported.</dd>
+                        </dl>
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="systems-samza-client-host">systems.<span class="system">system-name</span>.<br>client.transport.host</td>
+                    <td class="default"></td>
+                    <td class="description">
+                        <strong>Required</strong> for <code>TransportClientFactory</code>
+                        <p>The hostname that the TransportClientFactory connects to.</p>
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="systems-samza-client-port">systems.<span class="system">system-name</span>.<br>client.transport.port</td>
+                    <td class="default"></td>
+                    <td class="description">
+                        <strong>Required</strong> for <code>TransportClientFactory</code>
+                        <p>The port that the TransportClientFactory connects to.</p>
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="systems-samza-client-settings">systems.<span class="system">system-name</span>.<br>client.elasticsearch.*</td>
+                    <td class="default"></td>
+                    <td class="description">
+                        Any <a href="http://www.elastic.co/guide/en/elasticsearch/client/java-api/1.x/client.html">Elasticsearch client settings</a> can be used here. They will all be passed to both the transport and node clients.
+                        Some of the common settings you will want to provide are.
+                        <dl>
+                            <dt><code>systems.<span class="system">system-name</span>.client.elasticsearch.cluster.name</code></dt>
+                            <dd>The name of the Elasticsearch cluster the client is connecting to.</dd>
+                            <dt><code>systems.<span class="system">system-name</span>.client.elasticsearch.client.transport.sniff</code></dt>
+                            <dd>If set to <code>true</code> then the transport client will discover and keep
+                                up to date all cluster nodes. This is used for load balancing and fail-over on retries.</dd>
+                        </dl>
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="systems-samza-bulk-flush-max-actions">systems.<span class="system">system-name</span>.<br>bulk.flush.max.actions</td>
+                    <td class="default">1000</td>
+                    <td class="description">
+                        The maximum number of messages to be buffered before flushing.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="systems-samza-bulk-flush-max-size-mb">systems.<span class="system">system-name</span>.<br>bulk.flush.max.size.mb</td>
+                    <td class="default">5</td>
+                    <td class="description">
+                        The maximum aggregate size of messages in the buffered before flushing.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="systems-samza-bulk-flush-interval-ms">systems.<span class="system">system-name</span>.<br>bulk.flush.interval.ms</td>
+                    <td class="default">never</td>
+                    <td class="description">
+                        How often buffered messages should be flushed.
+                    </td>
+                </tr>
+
+                <tr>
                     <th colspan="3" class="section" id="kafka">
                         Using <a href="http://kafka.apache.org/">Kafka</a> for input streams, output streams and checkpoints<br>
                         <span class="subtitle">

http://git-wip-us.apache.org/repos/asf/samza/blob/fa6ca0b1/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index d79213f..fb06e8e 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -17,6 +17,7 @@
  * under the License.
  */
  ext {
+  elasticsearchVersion = "1.5.1"
   jodaTimeVersion = "2.2"
   joptSimpleVersion = "3.2"
   jacksonVersion = "1.8.5"

http://git-wip-us.apache.org/repos/asf/samza/blob/fa6ca0b1/samza-elasticsearch/src/main/java/org/apache/samza/config/ElasticsearchConfig.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/config/ElasticsearchConfig.java b/samza-elasticsearch/src/main/java/org/apache/samza/config/ElasticsearchConfig.java
new file mode 100644
index 0000000..6091feb
--- /dev/null
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/config/ElasticsearchConfig.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config;
+
+import org.apache.samza.SamzaException;
+import org.elasticsearch.common.base.Optional;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Elasticsearch configuration class to read elasticsearch specific configuration from Samza.
+ */
+public class ElasticsearchConfig extends MapConfig {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchConfig.class);
+
+  public static final String CONFIG_KEY_CLIENT_FACTORY = "client.factory";
+  public static final String PREFIX_ELASTICSEARCH_SETTINGS = "client.elasticsearch.";
+
+  public static final String CONFIG_KEY_INDEX_REQUEST_FACTORY = "index.request.factory";
+
+  public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
+  public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
+  public static final String CONFIG_KEY_BULK_FLUSH_INTERVALS_MS = "bulk.flush.interval.ms";
+
+  public static final String CONFIG_KEY_CLIENT_TRANSPORT_HOST = "client.transport.host";
+  public static final String CONFIG_KEY_CLIENT_TRANSPORT_PORT = "client.transport.port";
+
+  public ElasticsearchConfig(String name, Config config) {
+    super(config.subset("systems." + name + "."));
+
+    logAllSettings(this);
+  }
+
+  // Client settings
+  public String getClientFactoryClassName() {
+    if (containsKey(CONFIG_KEY_CLIENT_FACTORY)) {
+      return get(CONFIG_KEY_CLIENT_FACTORY);
+    } else {
+      throw new SamzaException("You must specify a client factory class"
+                               + " for the Elasticsearch system.");
+    }
+  }
+
+  public Config getElasticseachSettings() {
+    return subset(PREFIX_ELASTICSEARCH_SETTINGS);
+  }
+
+  // Index Request
+  public Optional<String> getIndexRequestFactoryClassName() {
+    if (containsKey(CONFIG_KEY_INDEX_REQUEST_FACTORY)) {
+      return Optional.of(get(CONFIG_KEY_INDEX_REQUEST_FACTORY));
+    } else {
+      return Optional.absent();
+    }
+  }
+
+  // Transport client settings
+  public Optional<String> getTransportHost() {
+    if (containsKey(CONFIG_KEY_CLIENT_TRANSPORT_HOST)) {
+      return Optional.of(get(CONFIG_KEY_CLIENT_TRANSPORT_HOST));
+    } else {
+      return Optional.absent();
+    }
+  }
+
+  public Optional<Integer> getTransportPort() {
+    if (containsKey(CONFIG_KEY_CLIENT_TRANSPORT_PORT)) {
+      return Optional.of(getInt(CONFIG_KEY_CLIENT_TRANSPORT_PORT));
+    } else {
+      return Optional.absent();
+    }
+  }
+
+  // Bulk processor settings
+  public Optional<Integer> getBulkFlushMaxActions() {
+    if (containsKey(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
+      return Optional.of(getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS));
+    } else {
+      return Optional.absent();
+    }
+  }
+
+  public Optional<Integer> getBulkFlushMaxSizeMB() {
+    if (containsKey(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
+      return Optional.of(getInt(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB));
+    } else {
+      return Optional.absent();
+    }
+  }
+
+  public Optional<Integer> getBulkFlushIntervalMS() {
+    if (containsKey(CONFIG_KEY_BULK_FLUSH_INTERVALS_MS)) {
+      return Optional.of(getInt(CONFIG_KEY_BULK_FLUSH_INTERVALS_MS));
+    } else {
+      return Optional.absent();
+    }
+  }
+
+  private void logAllSettings(Config config) {
+    StringBuilder b = new StringBuilder();
+    b.append("Elasticsearch System settings: ");
+    b.append("\n");
+    for (Map.Entry<String, String> entry : config.entrySet()) {
+      b.append('\t');
+      b.append(entry.getKey());
+      b.append(" = ");
+      b.append(entry.getValue());
+      b.append("\n");
+    }
+    LOGGER.info(b.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fa6ca0b1/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/BulkProcessorFactory.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/BulkProcessorFactory.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/BulkProcessorFactory.java
new file mode 100644
index 0000000..0027531
--- /dev/null
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/BulkProcessorFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.elasticsearch;
+
+import org.apache.samza.config.ElasticsearchConfig;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+
+/**
+ * Creates Elasticsearch {@link BulkProcessor} instances based on properties from the Samza job.
+ */
+public class BulkProcessorFactory {
+  private final ElasticsearchConfig config;
+
+  public BulkProcessorFactory(ElasticsearchConfig config) {
+    this.config = config;
+  }
+
+  public BulkProcessor getBulkProcessor(Client client, BulkProcessor.Listener listener) {
+    BulkProcessor.Builder builder = BulkProcessor.builder(client, listener);
+
+    // Concurrent requests set to 0 to ensure ordering of documents is maintained in batches.
+    // This also means BulkProcessor#flush() is blocking as is also required.
+    builder.setConcurrentRequests(0);
+
+    if (config.getBulkFlushMaxActions().isPresent()) {
+      builder.setBulkActions(config.getBulkFlushMaxActions().get());
+    }
+    if (config.getBulkFlushMaxSizeMB().isPresent()) {
+      builder.setBulkSize(new ByteSizeValue(config.getBulkFlushMaxSizeMB().get(), ByteSizeUnit.MB));
+    }
+    if (config.getBulkFlushIntervalMS().isPresent()) {
+      builder.setFlushInterval(TimeValue.timeValueMillis(config.getBulkFlushIntervalMS().get()));
+    }
+
+    return builder.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fa6ca0b1/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java
new file mode 100644
index 0000000..1fd5dd3
--- /dev/null
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.elasticsearch;
+
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Elasticsearch does not make sense to be a changelog store at the moment.
+ *
+ * <p>All the methods on this class return {@link UnsupportedOperationException}.</p>
+ */
+public class ElasticsearchSystemAdmin implements SystemAdmin {
+  private static final SystemAdmin singleton = new ElasticsearchSystemAdmin();
+
+  private ElasticsearchSystemAdmin() {
+    // Ensure this can not be constructed.
+  }
+
+  public static SystemAdmin getInstance() {
+    return singleton;
+  }
+
+  @Override
+  public Map<SystemStreamPartition, String> getOffsetsAfter(
+      Map<SystemStreamPartition, String> map) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> set) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void createChangelogStream(String stream, int foo) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void createCoordinatorStream(String streamName) {
+    throw new UnsupportedOperationException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fa6ca0b1/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
new file mode 100644
index 0000000..a277b69
--- /dev/null
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.elasticsearch;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ElasticsearchConfig;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.elasticsearch.client.ClientFactory;
+import org.apache.samza.system.elasticsearch.indexrequest.DefaultIndexRequestFactory;
+import org.apache.samza.system.elasticsearch.indexrequest.IndexRequestFactory;
+import org.apache.samza.util.Util;
+import org.elasticsearch.client.Client;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * A {@link SystemFactory} for Elasticsearch.
+ *
+ * <p>This only supports the {@link SystemProducer} so all other methods return an
+ * {@link UnsupportedOperationException}
+ * <p>
+ */
+public class ElasticsearchSystemFactory implements SystemFactory {
+
+  @Override
+  public SystemConsumer getConsumer(String name, Config config, MetricsRegistry metricsRegistry) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public SystemProducer getProducer(String name, Config config, MetricsRegistry metricsRegistry) {
+    ElasticsearchConfig elasticsearchConfig = new ElasticsearchConfig(name, config);
+    return new ElasticsearchSystemProducer(name,
+                                           getBulkProcessorFactory(elasticsearchConfig),
+                                           getClient(elasticsearchConfig),
+                                           getIndexRequestFactory(elasticsearchConfig));
+  }
+
+  @Override
+  public SystemAdmin getAdmin(String name, Config config) {
+    return ElasticsearchSystemAdmin.getInstance();
+  }
+
+
+  protected static BulkProcessorFactory getBulkProcessorFactory(ElasticsearchConfig config) {
+    return new BulkProcessorFactory(config);
+  }
+
+  protected static Client getClient(ElasticsearchConfig config) {
+    String name = config.getClientFactoryClassName();
+    try {
+      Constructor c = Class.forName(name).getConstructor(ElasticsearchConfig.class);
+
+      return ((ClientFactory) c.newInstance(config)).getClient();
+    } catch (InvocationTargetException | NoSuchMethodException | InstantiationException
+        | IllegalAccessException | ClassNotFoundException e) {
+      throw new RuntimeException(String.format("Could not instantiate class %s", name));
+    }
+  }
+
+  protected static IndexRequestFactory getIndexRequestFactory(ElasticsearchConfig config) {
+    if (config.getIndexRequestFactoryClassName().isPresent()) {
+      return (IndexRequestFactory) Util.getObj(config.getIndexRequestFactoryClassName().get());
+    } else {
+      return new DefaultIndexRequestFactory();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fa6ca0b1/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
new file mode 100644
index 0000000..7eb14a2
--- /dev/null
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.elasticsearch;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.elasticsearch.indexrequest.IndexRequestFactory;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Client;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/** A {@link SystemProducer} for Elasticsearch that builds on top of the {@link BulkProcessor}
+ *
+ * <p>
+ * Each system that is configured in Samza has an independent {@link BulkProcessor} that flush
+ * separably to Elasticsearch. Each {@link BulkProcessor} will maintain the ordering of messages
+ * being sent from tasks per Samza container. If you have multiple containers writing to the same
+ * message id there is no guarantee of ordering in Elasticsearch.
+ * </p>
+ *
+ * <p>
+ * This can be fully configured from the Samza job properties. The client factory and index request
+ * are pluggable so the implementation of these can be changed if required.
+ * </p>
+ *
+ * */
+public class ElasticsearchSystemProducer implements SystemProducer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchSystemProducer.class);
+
+  private final String system;
+  private final Map<String, BulkProcessor> sourceBulkProcessor;
+  private final AtomicBoolean sendFailed = new AtomicBoolean(false);
+  private final AtomicReference<Throwable> thrown = new AtomicReference<>();
+
+  private final IndexRequestFactory indexRequestFactory;
+  private final BulkProcessorFactory bulkProcessorFactory;
+
+  private Client client;
+
+  public ElasticsearchSystemProducer(String system, BulkProcessorFactory bulkProcessorFactory,
+                                     Client client, IndexRequestFactory indexRequestFactory) {
+    this.system = system;
+    this.sourceBulkProcessor = new HashMap<>();
+    this.bulkProcessorFactory = bulkProcessorFactory;
+    this.client = client;
+    this.indexRequestFactory = indexRequestFactory;
+  }
+
+
+  @Override
+  public void start() {
+    // Nothing to do.
+  }
+
+  @Override
+  public void stop() {
+    for (Map.Entry<String, BulkProcessor> e : sourceBulkProcessor.entrySet()) {
+      flush(e.getKey());
+      e.getValue().close();
+    }
+
+    client.close();
+  }
+
+  @Override
+  public void register(final String source) {
+    BulkProcessor.Listener listener = new BulkProcessor.Listener() {
+        @Override
+        public void beforeBulk(long executionId, BulkRequest request) {
+          // Nothing to do.
+        }
+
+        @Override
+        public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
+          if (response.hasFailures()) {
+            sendFailed.set(true);
+          } else {
+            LOGGER.info(String.format("Written %s messages from %s to %s.",
+                                      response.getItems().length, source, system));
+          }
+        }
+
+        @Override
+        public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
+          thrown.compareAndSet(null, failure);
+          sendFailed.set(true);
+        }
+    };
+
+    sourceBulkProcessor.put(source, bulkProcessorFactory.getBulkProcessor(client, listener));
+  }
+
+  @Override
+  public void send(String source, OutgoingMessageEnvelope envelope) {
+    IndexRequest indexRequest = indexRequestFactory.getIndexRequest(envelope);
+    sourceBulkProcessor.get(source).add(indexRequest);
+  }
+
+  @Override
+  public void flush(String source) {
+    sourceBulkProcessor.get(source).flush();
+
+    if (sendFailed.get()) {
+      String message = String.format("Unable to send message from %s to system %s.", source,
+                                     system);
+      LOGGER.error(message);
+
+      Throwable cause = thrown.get();
+      if (cause != null) {
+        throw new SamzaException(message, cause);
+      } else {
+        throw new SamzaException(message);
+      }
+    }
+
+    LOGGER.info(String.format("Flushed %s to %s.", source, system));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fa6ca0b1/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/client/ClientFactory.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/client/ClientFactory.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/client/ClientFactory.java
new file mode 100644
index 0000000..7256736
--- /dev/null
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/client/ClientFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.elasticsearch.client;
+
+import org.elasticsearch.client.Client;
+
+/**
+ * A factory that produces {@link Client} instances for connecting to an Elasticsearch cluster.
+ */
+public interface ClientFactory {
+
+  Client getClient();
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fa6ca0b1/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/client/NodeClientFactory.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/client/NodeClientFactory.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/client/NodeClientFactory.java
new file mode 100644
index 0000000..0ee9e3f
--- /dev/null
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/client/NodeClientFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.elasticsearch.client;
+
+import org.apache.samza.config.ElasticsearchConfig;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.node.NodeBuilder;
+
+import java.util.Map;
+
+/**
+ * A {@link ClientFactory} that creates a {@link Node} client that connects to
+ * and joins an Elasticsearch cluster.
+ *
+ * <p>
+ * How the Node discovers and joins the cluster can be configured
+ * via Elasticsearch settings in the properties of the Samza job.
+ * </p>
+ */
+public class NodeClientFactory implements ClientFactory {
+  private final Map<String, String> clientSettings;
+
+  public NodeClientFactory(ElasticsearchConfig config) {
+    clientSettings = config.getElasticseachSettings();
+  }
+
+  @Override
+  public Client getClient() {
+    Settings settings = ImmutableSettings.settingsBuilder()
+        .put(clientSettings)
+        .build();
+
+    Node node = NodeBuilder.nodeBuilder()
+        .client(true)
+        .settings(settings)
+        .build();
+
+    return node.client();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fa6ca0b1/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/client/TransportClientFactory.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/client/TransportClientFactory.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/client/TransportClientFactory.java
new file mode 100644
index 0000000..7f8f3f3
--- /dev/null
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/client/TransportClientFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.elasticsearch.client;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.ElasticsearchConfig;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.transport.TransportAddress;
+
+import java.util.Map;
+
+/**
+ * A {@link ClientFactory} that creates a {@link org.elasticsearch.node.Node} client that connects
+ * and joins an Elasticsearch cluster.
+ *
+ * <p>
+ * This requires both the host and port properties to be set.
+ * Other settings can be configured via Elasticsearch settings in the properties of the Samza job.
+ * </p>
+ *
+ */
+public class TransportClientFactory implements ClientFactory {
+  private final Map<String, String> clientSettings;
+  private final String transportHost;
+  private final int transportPort;
+
+  public TransportClientFactory(ElasticsearchConfig config) {
+    clientSettings = config.getElasticseachSettings();
+
+    if (config.getTransportHost().isPresent()) {
+      transportHost = config.getTransportHost().get();
+    } else {
+      throw new SamzaException("You must specify the transport host for TransportClientFactory"
+                               + "with the Elasticsearch system.");
+    }
+
+    if (config.getTransportPort().isPresent()) {
+      transportPort = config.getTransportPort().get();
+    } else {
+      throw new SamzaException("You must specify the transport port for TransportClientFactory"
+                               + "with the Elasticsearch system.");
+    }
+  }
+
+  @Override
+  public Client getClient() {
+    Settings settings = ImmutableSettings.settingsBuilder()
+        .put(clientSettings)
+        .build();
+
+    TransportAddress address = new InetSocketTransportAddress(transportHost, transportPort);
+
+    return new TransportClient(settings).addTransportAddress(address);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fa6ca0b1/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
new file mode 100644
index 0000000..afe0eee
--- /dev/null
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.elasticsearch.indexrequest;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.elasticsearch.action.index.IndexRequest;
+
+import java.util.Map;
+
+/**
+ * The default {@link IndexRequestFactory}.
+ *
+ * <p>Samza concepts are mapped to Elastic search concepts as follows:</p>
+ *
+ * <ul>
+ *   <li>
+ *     The index and type are derived from the stream name using
+ *     the following the pattern "{index-name}/{type-name}".
+ *   </li>
+ *   <li>
+ *     The id of the document is the {@link String} representation of the
+ *     {@link OutgoingMessageEnvelope#getKey()} from {@link Object#toString()} if provided.</li>
+ *   <li>
+ *     The source of the document is set from the {@link OutgoingMessageEnvelope#getMessage()}.
+ *     Supported types are {@link byte[]} and {@link Map} which are both
+ *     passed on without serialising.
+ *   </li>
+ *   <li>
+ *     The routing key is set from {@link String} representation of the
+ *     {@link OutgoingMessageEnvelope#getPartitionKey()} from {@link Object#toString()} if provided.
+ *   </li>
+ * </ul>
+ */
+public class DefaultIndexRequestFactory implements IndexRequestFactory {
+
+  @Override
+  public IndexRequest getIndexRequest(OutgoingMessageEnvelope envelope) {
+    String[] parts = envelope.getSystemStream().getStream().split("/");
+    if (parts.length != 2) {
+      throw new SamzaException("Elasticsearch stream name must match pattern {index}/{type}");
+    }
+    String index = parts[0];
+    String type = parts[1];
+    IndexRequest indexRequest = new IndexRequest(index, type);
+
+    Object id = envelope.getKey();
+    if (id != null) {
+      indexRequest.id(id.toString());
+    }
+
+    Object partitionKey = envelope.getPartitionKey();
+    if (partitionKey != null) {
+      indexRequest.routing(partitionKey.toString());
+    }
+
+    Object message = envelope.getMessage();
+    if (message instanceof byte[]) {
+      indexRequest.source((byte[]) message);
+    } else if (message instanceof Map) {
+      indexRequest.source((Map) message);
+    } else {
+      throw new SamzaException("Unsupported message type: " + message.getClass().getCanonicalName());
+    }
+
+    return indexRequest;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fa6ca0b1/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/IndexRequestFactory.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/IndexRequestFactory.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/IndexRequestFactory.java
new file mode 100644
index 0000000..b7bd00d
--- /dev/null
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/IndexRequestFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.elasticsearch.indexrequest;
+
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.elasticsearch.action.index.IndexRequest;
+
+/**
+ * A factory that creates Elasticsearch {@link IndexRequest} instances from the Samza
+ * {@link OutgoingMessageEnvelope}
+ *
+ */
+public interface IndexRequestFactory {
+
+  IndexRequest getIndexRequest(OutgoingMessageEnvelope envelope);
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fa6ca0b1/samza-elasticsearch/src/test/java/org/apache/samza/config/ElasticsearchConfigTest.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/test/java/org/apache/samza/config/ElasticsearchConfigTest.java b/samza-elasticsearch/src/test/java/org/apache/samza/config/ElasticsearchConfigTest.java
new file mode 100644
index 0000000..a5861ba
--- /dev/null
+++ b/samza-elasticsearch/src/test/java/org/apache/samza/config/ElasticsearchConfigTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+public class ElasticsearchConfigTest {
+
+  private ElasticsearchConfig EMPTY_CONFIG = new ElasticsearchConfig(
+      "es",
+      new MapConfig(Collections.<String, String>emptyMap()));
+
+  private ElasticsearchConfig configForProperty(String key, String value) {
+    Map<String, String> mapConfig = new HashMap<>();
+    mapConfig.put(key, value);
+    return new ElasticsearchConfig("es", new MapConfig(mapConfig));
+  }
+
+  @Test
+  public void testGetClientFactoryClassName() throws Exception {
+    ElasticsearchConfig config = configForProperty("systems.es.client.factory", "bar");
+
+    assertEquals("bar", config.getClientFactoryClassName());
+  }
+
+  @Test
+  public void testGetTransportHost() throws Exception {
+    assertFalse(EMPTY_CONFIG.getTransportHost().isPresent());
+
+    ElasticsearchConfig config = configForProperty("systems.es.client.transport.host", "example.org");
+
+    assertTrue(config.getTransportHost().isPresent());
+    assertEquals("example.org", config.getTransportHost().get());
+  }
+
+  @Test
+  public void testGetTransportPort() throws Exception {
+    assertFalse(EMPTY_CONFIG.getTransportPort().isPresent());
+
+    ElasticsearchConfig config = configForProperty("systems.es.client.transport.port", "9300");
+
+    assertTrue(config.getTransportPort().isPresent());
+    assertEquals(Integer.valueOf(9300), config.getTransportPort().get());
+  }
+
+  @Test
+  public void testGetElasticsearchSettings() throws Exception {
+    ElasticsearchConfig config = configForProperty("systems.es.client.elasticsearch.foo", "bar");
+
+    assertEquals("bar", config.getElasticseachSettings().get("foo"));
+  }
+
+  @Test
+  public void testGetBulkFlushMaxActions() throws Exception {
+    assertFalse(EMPTY_CONFIG.getBulkFlushMaxActions().isPresent());
+
+    ElasticsearchConfig config = configForProperty("systems.es.bulk.flush.max.actions", "10");
+
+    assertEquals(Integer.valueOf(10), config.getBulkFlushMaxActions().get());
+  }
+
+  @Test
+  public void testGetBulkFlushMaxSizeMB() throws Exception {
+    assertFalse(EMPTY_CONFIG.getBulkFlushMaxSizeMB().isPresent());
+
+    ElasticsearchConfig config = configForProperty("systems.es.bulk.flush.max.size.mb", "10");
+
+    assertTrue(config.getBulkFlushMaxSizeMB().isPresent());
+    assertEquals(Integer.valueOf(10), config.getBulkFlushMaxSizeMB().get());
+  }
+
+  @Test
+  public void testGetBulkFlushIntervalMS() throws Exception {
+    assertFalse(EMPTY_CONFIG.getBulkFlushIntervalMS().isPresent());
+
+    ElasticsearchConfig config = configForProperty("systems.es.bulk.flush.interval.ms", "10");
+
+    assertTrue(config.getBulkFlushIntervalMS().isPresent());
+    assertEquals(Integer.valueOf(10), config.getBulkFlushIntervalMS().get());
+  }
+
+  @Test
+  public void testGetIndexRequestFactoryClassName() throws Exception {
+    assertFalse(EMPTY_CONFIG.getIndexRequestFactoryClassName().isPresent());
+
+    ElasticsearchConfig config = configForProperty("systems.es.index.request.factory", "foo");
+
+    assertTrue(config.getIndexRequestFactoryClassName().isPresent());
+    assertEquals("foo", config.getIndexRequestFactoryClassName().get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fa6ca0b1/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java b/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
new file mode 100644
index 0000000..e63d62c
--- /dev/null
+++ b/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.elasticsearch;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.elasticsearch.indexrequest.IndexRequestFactory;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Client;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class ElasticsearchSystemProducerTest {
+  private static final String SYSTEM_NAME = "es";
+  private static final BulkProcessorFactory BULK_PROCESSOR_FACTORY = mock(BulkProcessorFactory.class);
+  private static final Client CLIENT = mock(Client.class);
+  private static final IndexRequestFactory INDEX_REQUEST_FACTORY = mock(IndexRequestFactory.class);
+  public static final String SOURCE_ONE = "one";
+  public static final String SOURCE_TWO = "two";
+  private SystemProducer producer;
+  public static BulkProcessor processorOne;
+  public static BulkProcessor processorTwo;
+
+  @Before
+  public void setUp() throws Exception {
+    producer = new ElasticsearchSystemProducer(SYSTEM_NAME,
+                                               BULK_PROCESSOR_FACTORY,
+                                               CLIENT,
+                                               INDEX_REQUEST_FACTORY);
+
+    processorOne = mock(BulkProcessor.class);
+    processorTwo = mock(BulkProcessor.class);
+
+    when(BULK_PROCESSOR_FACTORY.getBulkProcessor(eq(CLIENT), any(BulkProcessor.Listener.class)))
+        .thenReturn(processorOne);
+    producer.register(SOURCE_ONE);
+
+    when(BULK_PROCESSOR_FACTORY.getBulkProcessor(eq(CLIENT), any(BulkProcessor.Listener.class)))
+        .thenReturn(processorTwo);
+    producer.register(SOURCE_TWO);
+  }
+
+  @Test
+  public void testRegisterStop() throws Exception {
+    producer.stop();
+
+    verify(processorOne).flush();
+    verify(processorTwo).flush();
+
+    verify(processorOne).close();
+    verify(processorTwo).close();
+
+    verify(CLIENT).close();
+  }
+
+  @Test
+  public void testSend() throws Exception {
+    OutgoingMessageEnvelope envelope = mock(OutgoingMessageEnvelope.class);
+    IndexRequest indexRequest = mock(IndexRequest.class);
+
+    when(INDEX_REQUEST_FACTORY.getIndexRequest(envelope)).thenReturn(indexRequest);
+
+    producer.send(SOURCE_ONE, envelope);
+
+    verify(processorOne).add(indexRequest);
+  }
+
+  @Test
+  public void testFlushNoFailedSend() throws Exception {
+    producer.flush(SOURCE_ONE);
+
+    verify(processorOne).flush();
+    verify(processorTwo, never()).flush();
+  }
+
+  @Test(expected=SamzaException.class)
+  public void testFlushFailedSendFromException() throws Exception {
+    ArgumentCaptor<BulkProcessor.Listener> listenerCaptor =
+        ArgumentCaptor.forClass(BulkProcessor.Listener.class);
+
+    when(BULK_PROCESSOR_FACTORY.getBulkProcessor(eq(CLIENT), listenerCaptor.capture()))
+        .thenReturn(processorOne);
+    producer.register(SOURCE_ONE);
+
+    listenerCaptor.getValue().afterBulk(0, null, new Throwable());
+
+    producer.flush(SOURCE_ONE);
+  }
+
+  @Test(expected=SamzaException.class)
+  public void testFlushFailedSendFromFailedDocument() throws Exception {
+    ArgumentCaptor<BulkProcessor.Listener> listenerCaptor =
+        ArgumentCaptor.forClass(BulkProcessor.Listener.class);
+
+    when(BULK_PROCESSOR_FACTORY.getBulkProcessor(eq(CLIENT), listenerCaptor.capture()))
+        .thenReturn(processorOne);
+    producer.register(SOURCE_ONE);
+
+    BulkResponse response = mock(BulkResponse.class);
+    when(response.hasFailures()).thenReturn(true);
+
+    listenerCaptor.getValue().afterBulk(0, null, response);
+
+    producer.flush(SOURCE_ONE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fa6ca0b1/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactoryTest.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactoryTest.java b/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactoryTest.java
new file mode 100644
index 0000000..61c3e7d
--- /dev/null
+++ b/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactoryTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.elasticsearch.indexrequest;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.common.base.Charsets;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class DefaultIndexRequestFactoryTest {
+
+  private static final IndexRequestFactory indexRequestFactory = new DefaultIndexRequestFactory();
+  private static final String TYPE = "type";
+  private static final String INDEX = "index";
+  private static final SystemStream SYSTEM = mock(SystemStream.class);
+  private static final Map EMPTY_MSG = Collections.emptyMap();
+
+  @Before
+  public void setup() {
+    when(SYSTEM.getStream()).thenReturn(INDEX + "/" + TYPE);
+  }
+
+  @Test
+  public void testGetIndexRequestStreamName()  {
+    IndexRequest indexRequest = indexRequestFactory.
+        getIndexRequest(new OutgoingMessageEnvelope(SYSTEM, EMPTY_MSG));
+
+    assertEquals(INDEX, indexRequest.index());
+    assertEquals(TYPE, indexRequest.type());
+  }
+
+  @Test(expected=SamzaException.class)
+  public void testGetIndexRequestInvalidStreamName()  {
+    when(SYSTEM.getStream()).thenReturn(INDEX);
+    indexRequestFactory.getIndexRequest(new OutgoingMessageEnvelope(SYSTEM, EMPTY_MSG));
+  }
+
+  @Test
+  public void testGetIndexRequestNoId() throws Exception {
+    IndexRequest indexRequest =
+        indexRequestFactory.getIndexRequest(new OutgoingMessageEnvelope(SYSTEM, EMPTY_MSG));
+
+    assertNull(indexRequest.id());
+  }
+
+  @Test
+  public void testGetIndexRequestWithId() throws Exception {
+    IndexRequest indexRequest =
+        indexRequestFactory.getIndexRequest(new OutgoingMessageEnvelope(SYSTEM, "id", EMPTY_MSG));
+
+    assertEquals("id", indexRequest.id());
+  }
+
+  @Test
+  public void testGetIndexRequestNoPartitionKey() throws Exception {
+    IndexRequest indexRequest = indexRequestFactory.getIndexRequest(
+        new OutgoingMessageEnvelope(SYSTEM, EMPTY_MSG));
+
+    assertNull(indexRequest.routing());
+  }
+
+  @Test
+  public void testGetIndexRequestWithPartitionKey() throws Exception {
+    IndexRequest indexRequest = indexRequestFactory.getIndexRequest(
+        new OutgoingMessageEnvelope(SYSTEM, "shardKey", "id", EMPTY_MSG));
+
+    assertEquals("shardKey", indexRequest.routing());
+  }
+
+  @Test
+  public void testGetIndexRequestMessageBytes() throws Exception {
+    IndexRequest indexRequest = indexRequestFactory.getIndexRequest(
+        new OutgoingMessageEnvelope(SYSTEM, "{\"foo\":\"bar\"}".getBytes(Charsets.UTF_8)));
+
+    assertEquals(Collections.singletonMap("foo", "bar"), indexRequest.sourceAsMap());
+  }
+
+  @Test
+  public void testGetIndexRequestMessageMap() throws Exception {
+    IndexRequest indexRequest = indexRequestFactory.getIndexRequest(
+        new OutgoingMessageEnvelope(SYSTEM, Collections.singletonMap("foo", "bar")));
+
+    assertEquals(Collections.singletonMap("foo", "bar"), indexRequest.sourceAsMap());
+  }
+
+  @Test(expected=SamzaException.class)
+  public void testGetIndexRequestInvalidMessage() throws Exception {
+    indexRequestFactory.getIndexRequest(new OutgoingMessageEnvelope(SYSTEM, "{'foo':'bar'}"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fa6ca0b1/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index bb07a3b..19bff97 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -18,18 +18,26 @@
  */
 include \
   'samza-api',
-  'samza-core',
-  'samza-kafka',
-  'samza-kv',
-  'samza-kv-inmemory',
-  'samza-kv-rocksdb',
+  'samza-elasticsearch',
   'samza-log4j',
-  'samza-shell',
-  'samza-yarn',
-  'samza-test'
+  'samza-shell'
+
+def scalaModules = [
+        'samza-core',
+        'samza-kafka',
+        'samza-kv',
+        'samza-kv-inmemory',
+        'samza-kv-rocksdb',
+        'samza-yarn',
+        'samza-test'
+] as HashSet
+
+scalaModules.each {
+  include it
+}
 
 rootProject.children.each {
-  if (it.name != 'samza-api' && it.name != 'samza-shell' && it.name != 'samza-log4j') {
+  if (scalaModules.contains(it.name)) {
     it.name = it.name + "_" + scalaVersion
   }
 }