You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ya...@apache.org on 2015/07/10 23:19:00 UTC
samza git commit: SAMZA-654: added Elasticsearch Producer
Repository: samza
Updated Branches:
refs/heads/master 99dbca19d -> fa6ca0b14
SAMZA-654: added Elasticsearch Producer
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/fa6ca0b1
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/fa6ca0b1
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/fa6ca0b1
Branch: refs/heads/master
Commit: fa6ca0b1466099a289a04b5fd5674fffbcca5ca3
Parents: 99dbca1
Author: Dan Harvey <da...@gmail.com>
Authored: Fri Jul 10 14:18:36 2015 -0700
Committer: Yan Fang <ya...@gmail.com>
Committed: Fri Jul 10 14:18:36 2015 -0700
----------------------------------------------------------------------
build.gradle | 16 ++
.../versioned/jobs/configuration-table.html | 107 ++++++++++++++
gradle/dependency-versions.gradle | 1 +
.../samza/config/ElasticsearchConfig.java | 132 +++++++++++++++++
.../elasticsearch/BulkProcessorFactory.java | 58 ++++++++
.../elasticsearch/ElasticsearchSystemAdmin.java | 65 +++++++++
.../ElasticsearchSystemFactory.java | 91 ++++++++++++
.../ElasticsearchSystemProducer.java | 145 +++++++++++++++++++
.../elasticsearch/client/ClientFactory.java | 31 ++++
.../elasticsearch/client/NodeClientFactory.java | 60 ++++++++
.../client/TransportClientFactory.java | 76 ++++++++++
.../DefaultIndexRequestFactory.java | 85 +++++++++++
.../indexrequest/IndexRequestFactory.java | 34 +++++
.../samza/config/ElasticsearchConfigTest.java | 117 +++++++++++++++
.../ElasticsearchSystemProducerTest.java | 134 +++++++++++++++++
.../DefaultIndexRequestFactoryTest.java | 118 +++++++++++++++
settings.gradle | 26 ++--
17 files changed, 1287 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/fa6ca0b1/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index a5f5410..0852adc 100644
--- a/build.gradle
+++ b/build.gradle
@@ -157,6 +157,22 @@ project(":samza-core_$scalaVersion") {
}
}
+project(':samza-elasticsearch') {
+ apply plugin: 'java'
+
+ dependencies {
+ compile project(':samza-api')
+ compile project(":samza-core_$scalaVersion")
+ compile "org.elasticsearch:elasticsearch:$elasticsearchVersion"
+ compile "org.slf4j:slf4j-api:$slf4jVersion"
+ testCompile "junit:junit:$junitVersion"
+ testCompile "org.mockito:mockito-all:$mockitoVersion"
+
+ // Logging in tests is good.
+ testRuntime "org.slf4j:slf4j-simple:$slf4jVersion"
+ }
+}
+
project(":samza-kafka_$scalaVersion") {
apply plugin: 'scala'
http://git-wip-us.apache.org/repos/asf/samza/blob/fa6ca0b1/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 405e2ce..cd7ea8d 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -638,6 +638,113 @@
</tr>
<tr>
+ <th colspan="3" class="section" id="elasticsearch">
+ Using <a href="https://github.com/elastic/elasticsearch">Elasticsearch</a> for output streams<br>
+ <span class="subtitle">
+ (This section applies if you have set
+ <a href="#systems-samza-factory" class="property">systems.*.samza.factory</a>
+ <code>= org.apache.samza.system.elasticsearch.ElasticsearchSystemFactory</code>)
+ </span>
+ </th>
+ </tr>
+
+ <tr>
+ <td class="property" id="systems-samza-client-factory-class">systems.<span class="system">system-name</span>.<br>client.factory</td>
+ <td class="default"></td>
+ <td class="description">
+ <strong>Required:</strong> The elasticsearch client factory used for connecting
+ to the Elasticsearch cluster. Samza ships with the following implementations:
+ <dl>
+ <dt><code>org.apache.samza.system.elasticsearch.client.TransportClientFactory</code></dt>
+ <dd>Creates a TransportClient that connects to the cluster remotely without
+ joining it. This requires the transport host and port properties to be set.</dd>
+ <dt><code>org.apache.samza.system.elasticsearch.client.NodeClientFactory</code></dt>
+ <dd>Creates a Node client that connects to the cluster by joining it. By default
+ this uses <a href="http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-discovery-zen.html">zen discovery</a> to find the cluster but other methods can be configured.</dd>
+ </dl>
+ </td>
+ </tr>
+
+ <tr>
+ <td class="property" id="systems-samza-index-request-factory-class">systems.<span class="system">system-name</span>.<br>index.request.factory</td>
+ <td class="default">org.apache.samza.system</br>.elasticsearch.indexrequest.</br>DefaultIndexRequestFactory</td>
+ <td class="description">
+ The index request factory that converts the Samza OutgoingMessageEnvelope into the IndexRequest
+ to be send to elasticsearch. The default IndexRequestFactory behaves as follows:
+ <dl>
+ <dt><code>Stream name</code></dt>
+ <dd>The stream name is of the format {index-name}/{type-name} which
+ map on to the elasticsearch index and type.</dd>
+ <dt><code>Message id</code></dt>
+ <dd>If the message has a key this is set as the document id, otherwise Elasticsearch will generate one for each document.</dd>
+ <dt><code>Partition id</code></dt>
+ <dd>If the partition key is set then this is used as the Elasticsearch routing key.</dd>
+ <dt><code>Message</code></dt>
+ <dd>The message must be either a byte[] which is passed directly on to Elasticsearch, or a Map which is passed on to the
+ Elasticsearch client which serialises it into a JSON String. Samza serdes are not currently supported.</dd>
+ </dl>
+ </td>
+ </tr>
+
+ <tr>
+ <td class="property" id="systems-samza-client-host">systems.<span class="system">system-name</span>.<br>client.transport.host</td>
+ <td class="default"></td>
+ <td class="description">
+ <strong>Required</strong> for <code>TransportClientFactory</code>
+ <p>The hostname that the TransportClientFactory connects to.</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td class="property" id="systems-samza-client-port">systems.<span class="system">system-name</span>.<br>client.transport.port</td>
+ <td class="default"></td>
+ <td class="description">
+ <strong>Required</strong> for <code>TransportClientFactory</code>
+ <p>The port that the TransportClientFactory connects to.</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td class="property" id="systems-samza-client-settings">systems.<span class="system">system-name</span>.<br>client.elasticsearch.*</td>
+ <td class="default"></td>
+ <td class="description">
+ Any <a href="http://www.elastic.co/guide/en/elasticsearch/client/java-api/1.x/client.html">Elasticsearch client settings</a> can be used here. They will all be passed to both the transport and node clients.
+ Some of the common settings you will want to provide are.
+ <dl>
+ <dt><code>systems.<span class="system">system-name</span>.client.elasticsearch.cluster.name</code></dt>
+ <dd>The name of the Elasticsearch cluster the client is connecting to.</dd>
+ <dt><code>systems.<span class="system">system-name</span>.client.elasticsearch.client.transport.sniff</code></dt>
+ <dd>If set to <code>true</code> then the transport client will discover and keep
+ up to date all cluster nodes. This is used for load balancing and fail-over on retries.</dd>
+ </dl>
+ </td>
+ </tr>
+
+ <tr>
+ <td class="property" id="systems-samza-bulk-flush-max-actions">systems.<span class="system">system-name</span>.<br>bulk.flush.max.actions</td>
+ <td class="default">1000</td>
+ <td class="description">
+ The maximum number of messages to be buffered before flushing.
+ </td>
+ </tr>
+
+ <tr>
+ <td class="property" id="systems-samza-bulk-flush-max-size-mb">systems.<span class="system">system-name</span>.<br>bulk.flush.max.size.mb</td>
+ <td class="default">5</td>
+ <td class="description">
+ The maximum aggregate size of messages in the buffered before flushing.
+ </td>
+ </tr>
+
+ <tr>
+ <td class="property" id="systems-samza-bulk-flush-interval-ms">systems.<span class="system">system-name</span>.<br>bulk.flush.interval.ms</td>
+ <td class="default">never</td>
+ <td class="description">
+ How often buffered messages should be flushed.
+ </td>
+ </tr>
+
+ <tr>
<th colspan="3" class="section" id="kafka">
Using <a href="http://kafka.apache.org/">Kafka</a> for input streams, output streams and checkpoints<br>
<span class="subtitle">
http://git-wip-us.apache.org/repos/asf/samza/blob/fa6ca0b1/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index d79213f..fb06e8e 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -17,6 +17,7 @@
* under the License.
*/
ext {
+ elasticsearchVersion = "1.5.1"
jodaTimeVersion = "2.2"
joptSimpleVersion = "3.2"
jacksonVersion = "1.8.5"
http://git-wip-us.apache.org/repos/asf/samza/blob/fa6ca0b1/samza-elasticsearch/src/main/java/org/apache/samza/config/ElasticsearchConfig.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/config/ElasticsearchConfig.java b/samza-elasticsearch/src/main/java/org/apache/samza/config/ElasticsearchConfig.java
new file mode 100644
index 0000000..6091feb
--- /dev/null
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/config/ElasticsearchConfig.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config;
+
+import org.apache.samza.SamzaException;
+import org.elasticsearch.common.base.Optional;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Elasticsearch configuration class to read elasticsearch specific configuration from Samza.
+ */
+public class ElasticsearchConfig extends MapConfig {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchConfig.class);
+
+ public static final String CONFIG_KEY_CLIENT_FACTORY = "client.factory";
+ public static final String PREFIX_ELASTICSEARCH_SETTINGS = "client.elasticsearch.";
+
+ public static final String CONFIG_KEY_INDEX_REQUEST_FACTORY = "index.request.factory";
+
+ public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
+ public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
+ public static final String CONFIG_KEY_BULK_FLUSH_INTERVALS_MS = "bulk.flush.interval.ms";
+
+ public static final String CONFIG_KEY_CLIENT_TRANSPORT_HOST = "client.transport.host";
+ public static final String CONFIG_KEY_CLIENT_TRANSPORT_PORT = "client.transport.port";
+
+ public ElasticsearchConfig(String name, Config config) {
+ super(config.subset("systems." + name + "."));
+
+ logAllSettings(this);
+ }
+
+ // Client settings
+ public String getClientFactoryClassName() {
+ if (containsKey(CONFIG_KEY_CLIENT_FACTORY)) {
+ return get(CONFIG_KEY_CLIENT_FACTORY);
+ } else {
+ throw new SamzaException("You must specify a client factory class"
+ + " for the Elasticsearch system.");
+ }
+ }
+
+ public Config getElasticseachSettings() {
+ return subset(PREFIX_ELASTICSEARCH_SETTINGS);
+ }
+
+ // Index Request
+ public Optional<String> getIndexRequestFactoryClassName() {
+ if (containsKey(CONFIG_KEY_INDEX_REQUEST_FACTORY)) {
+ return Optional.of(get(CONFIG_KEY_INDEX_REQUEST_FACTORY));
+ } else {
+ return Optional.absent();
+ }
+ }
+
+ // Transport client settings
+ public Optional<String> getTransportHost() {
+ if (containsKey(CONFIG_KEY_CLIENT_TRANSPORT_HOST)) {
+ return Optional.of(get(CONFIG_KEY_CLIENT_TRANSPORT_HOST));
+ } else {
+ return Optional.absent();
+ }
+ }
+
+ public Optional<Integer> getTransportPort() {
+ if (containsKey(CONFIG_KEY_CLIENT_TRANSPORT_PORT)) {
+ return Optional.of(getInt(CONFIG_KEY_CLIENT_TRANSPORT_PORT));
+ } else {
+ return Optional.absent();
+ }
+ }
+
+ // Bulk processor settings
+ public Optional<Integer> getBulkFlushMaxActions() {
+ if (containsKey(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
+ return Optional.of(getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS));
+ } else {
+ return Optional.absent();
+ }
+ }
+
+ public Optional<Integer> getBulkFlushMaxSizeMB() {
+ if (containsKey(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
+ return Optional.of(getInt(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB));
+ } else {
+ return Optional.absent();
+ }
+ }
+
+ public Optional<Integer> getBulkFlushIntervalMS() {
+ if (containsKey(CONFIG_KEY_BULK_FLUSH_INTERVALS_MS)) {
+ return Optional.of(getInt(CONFIG_KEY_BULK_FLUSH_INTERVALS_MS));
+ } else {
+ return Optional.absent();
+ }
+ }
+
+ private void logAllSettings(Config config) {
+ StringBuilder b = new StringBuilder();
+ b.append("Elasticsearch System settings: ");
+ b.append("\n");
+ for (Map.Entry<String, String> entry : config.entrySet()) {
+ b.append('\t');
+ b.append(entry.getKey());
+ b.append(" = ");
+ b.append(entry.getValue());
+ b.append("\n");
+ }
+ LOGGER.info(b.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa6ca0b1/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/BulkProcessorFactory.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/BulkProcessorFactory.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/BulkProcessorFactory.java
new file mode 100644
index 0000000..0027531
--- /dev/null
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/BulkProcessorFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.elasticsearch;
+
+import org.apache.samza.config.ElasticsearchConfig;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+
+/**
+ * Creates Elasticsearch {@link BulkProcessor} instances based on properties from the Samza job.
+ */
+public class BulkProcessorFactory {
+ private final ElasticsearchConfig config;
+
+ public BulkProcessorFactory(ElasticsearchConfig config) {
+ this.config = config;
+ }
+
+ public BulkProcessor getBulkProcessor(Client client, BulkProcessor.Listener listener) {
+ BulkProcessor.Builder builder = BulkProcessor.builder(client, listener);
+
+ // Concurrent requests set to 0 to ensure ordering of documents is maintained in batches.
+ // This also means BulkProcessor#flush() is blocking as is also required.
+ builder.setConcurrentRequests(0);
+
+ if (config.getBulkFlushMaxActions().isPresent()) {
+ builder.setBulkActions(config.getBulkFlushMaxActions().get());
+ }
+ if (config.getBulkFlushMaxSizeMB().isPresent()) {
+ builder.setBulkSize(new ByteSizeValue(config.getBulkFlushMaxSizeMB().get(), ByteSizeUnit.MB));
+ }
+ if (config.getBulkFlushIntervalMS().isPresent()) {
+ builder.setFlushInterval(TimeValue.timeValueMillis(config.getBulkFlushIntervalMS().get()));
+ }
+
+ return builder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa6ca0b1/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java
new file mode 100644
index 0000000..1fd5dd3
--- /dev/null
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.elasticsearch;
+
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Elasticsearch does not make sense to be a changelog store at the moment.
+ *
+ * <p>All the methods on this class return {@link UnsupportedOperationException}.</p>
+ */
+public class ElasticsearchSystemAdmin implements SystemAdmin {
+ private static final SystemAdmin singleton = new ElasticsearchSystemAdmin();
+
+ private ElasticsearchSystemAdmin() {
+ // Ensure this can not be constructed.
+ }
+
+ public static SystemAdmin getInstance() {
+ return singleton;
+ }
+
+ @Override
+ public Map<SystemStreamPartition, String> getOffsetsAfter(
+ Map<SystemStreamPartition, String> map) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> set) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void createChangelogStream(String stream, int foo) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void createCoordinatorStream(String streamName) {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa6ca0b1/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
new file mode 100644
index 0000000..a277b69
--- /dev/null
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.elasticsearch;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ElasticsearchConfig;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.elasticsearch.client.ClientFactory;
+import org.apache.samza.system.elasticsearch.indexrequest.DefaultIndexRequestFactory;
+import org.apache.samza.system.elasticsearch.indexrequest.IndexRequestFactory;
+import org.apache.samza.util.Util;
+import org.elasticsearch.client.Client;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * A {@link SystemFactory} for Elasticsearch.
+ *
+ * <p>This only supports the {@link SystemProducer} so all other methods return an
+ * {@link UnsupportedOperationException}
+ * <p>
+ */
+public class ElasticsearchSystemFactory implements SystemFactory {
+
+ @Override
+ public SystemConsumer getConsumer(String name, Config config, MetricsRegistry metricsRegistry) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public SystemProducer getProducer(String name, Config config, MetricsRegistry metricsRegistry) {
+ ElasticsearchConfig elasticsearchConfig = new ElasticsearchConfig(name, config);
+ return new ElasticsearchSystemProducer(name,
+ getBulkProcessorFactory(elasticsearchConfig),
+ getClient(elasticsearchConfig),
+ getIndexRequestFactory(elasticsearchConfig));
+ }
+
+ @Override
+ public SystemAdmin getAdmin(String name, Config config) {
+ return ElasticsearchSystemAdmin.getInstance();
+ }
+
+
+ protected static BulkProcessorFactory getBulkProcessorFactory(ElasticsearchConfig config) {
+ return new BulkProcessorFactory(config);
+ }
+
+ protected static Client getClient(ElasticsearchConfig config) {
+ String name = config.getClientFactoryClassName();
+ try {
+ Constructor c = Class.forName(name).getConstructor(ElasticsearchConfig.class);
+
+ return ((ClientFactory) c.newInstance(config)).getClient();
+ } catch (InvocationTargetException | NoSuchMethodException | InstantiationException
+ | IllegalAccessException | ClassNotFoundException e) {
+ throw new RuntimeException(String.format("Could not instantiate class %s", name));
+ }
+ }
+
+ protected static IndexRequestFactory getIndexRequestFactory(ElasticsearchConfig config) {
+ if (config.getIndexRequestFactoryClassName().isPresent()) {
+ return (IndexRequestFactory) Util.getObj(config.getIndexRequestFactoryClassName().get());
+ } else {
+ return new DefaultIndexRequestFactory();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa6ca0b1/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
new file mode 100644
index 0000000..7eb14a2
--- /dev/null
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.elasticsearch;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.elasticsearch.indexrequest.IndexRequestFactory;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Client;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/** A {@link SystemProducer} for Elasticsearch that builds on top of the {@link BulkProcessor}
+ *
+ * <p>
+ * Each system that is configured in Samza has an independent {@link BulkProcessor} that flush
+ * separably to Elasticsearch. Each {@link BulkProcessor} will maintain the ordering of messages
+ * being sent from tasks per Samza container. If you have multiple containers writing to the same
+ * message id there is no guarantee of ordering in Elasticsearch.
+ * </p>
+ *
+ * <p>
+ * This can be fully configured from the Samza job properties. The client factory and index request
+ * are pluggable so the implementation of these can be changed if required.
+ * </p>
+ *
+ * */
+public class ElasticsearchSystemProducer implements SystemProducer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchSystemProducer.class);
+
+ private final String system;
+ private final Map<String, BulkProcessor> sourceBulkProcessor;
+ private final AtomicBoolean sendFailed = new AtomicBoolean(false);
+ private final AtomicReference<Throwable> thrown = new AtomicReference<>();
+
+ private final IndexRequestFactory indexRequestFactory;
+ private final BulkProcessorFactory bulkProcessorFactory;
+
+ private Client client;
+
+ public ElasticsearchSystemProducer(String system, BulkProcessorFactory bulkProcessorFactory,
+ Client client, IndexRequestFactory indexRequestFactory) {
+ this.system = system;
+ this.sourceBulkProcessor = new HashMap<>();
+ this.bulkProcessorFactory = bulkProcessorFactory;
+ this.client = client;
+ this.indexRequestFactory = indexRequestFactory;
+ }
+
+
+ @Override
+ public void start() {
+ // Nothing to do.
+ }
+
+ @Override
+ public void stop() {
+ for (Map.Entry<String, BulkProcessor> e : sourceBulkProcessor.entrySet()) {
+ flush(e.getKey());
+ e.getValue().close();
+ }
+
+ client.close();
+ }
+
+ @Override
+ public void register(final String source) {
+ BulkProcessor.Listener listener = new BulkProcessor.Listener() {
+ @Override
+ public void beforeBulk(long executionId, BulkRequest request) {
+ // Nothing to do.
+ }
+
+ @Override
+ public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
+ if (response.hasFailures()) {
+ sendFailed.set(true);
+ } else {
+ LOGGER.info(String.format("Written %s messages from %s to %s.",
+ response.getItems().length, source, system));
+ }
+ }
+
+ @Override
+ public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
+ thrown.compareAndSet(null, failure);
+ sendFailed.set(true);
+ }
+ };
+
+ sourceBulkProcessor.put(source, bulkProcessorFactory.getBulkProcessor(client, listener));
+ }
+
+ @Override
+ public void send(String source, OutgoingMessageEnvelope envelope) {
+ IndexRequest indexRequest = indexRequestFactory.getIndexRequest(envelope);
+ sourceBulkProcessor.get(source).add(indexRequest);
+ }
+
+ @Override
+ public void flush(String source) {
+ sourceBulkProcessor.get(source).flush();
+
+ if (sendFailed.get()) {
+ String message = String.format("Unable to send message from %s to system %s.", source,
+ system);
+ LOGGER.error(message);
+
+ Throwable cause = thrown.get();
+ if (cause != null) {
+ throw new SamzaException(message, cause);
+ } else {
+ throw new SamzaException(message);
+ }
+ }
+
+ LOGGER.info(String.format("Flushed %s to %s.", source, system));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa6ca0b1/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/client/ClientFactory.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/client/ClientFactory.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/client/ClientFactory.java
new file mode 100644
index 0000000..7256736
--- /dev/null
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/client/ClientFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.elasticsearch.client;
+
+import org.elasticsearch.client.Client;
+
+/**
+ * A factory that produces {@link Client} instances for connecting to an Elasticsearch cluster.
+ */
+public interface ClientFactory {
+
+ Client getClient();
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa6ca0b1/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/client/NodeClientFactory.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/client/NodeClientFactory.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/client/NodeClientFactory.java
new file mode 100644
index 0000000..0ee9e3f
--- /dev/null
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/client/NodeClientFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.elasticsearch.client;
+
+import org.apache.samza.config.ElasticsearchConfig;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.node.NodeBuilder;
+
+import java.util.Map;
+
+/**
+ * A {@link ClientFactory} that creates a {@link Node} client that connects to
+ * and joins an Elasticsearch cluster.
+ *
+ * <p>
+ * How the Node discovers and joins the cluster can be configured
+ * via Elasticsearch settings in the properties of the Samza job.
+ * </p>
+ */
+public class NodeClientFactory implements ClientFactory {
+ private final Map<String, String> clientSettings;
+
+ public NodeClientFactory(ElasticsearchConfig config) {
+ clientSettings = config.getElasticseachSettings();
+ }
+
+ @Override
+ public Client getClient() {
+ Settings settings = ImmutableSettings.settingsBuilder()
+ .put(clientSettings)
+ .build();
+
+ Node node = NodeBuilder.nodeBuilder()
+ .client(true)
+ .settings(settings)
+ .build();
+
+ return node.client();
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa6ca0b1/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/client/TransportClientFactory.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/client/TransportClientFactory.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/client/TransportClientFactory.java
new file mode 100644
index 0000000..7f8f3f3
--- /dev/null
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/client/TransportClientFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.elasticsearch.client;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.ElasticsearchConfig;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.transport.TransportAddress;
+
+import java.util.Map;
+
+/**
+ * A {@link ClientFactory} that creates a {@link org.elasticsearch.node.Node} client that connects
+ * and joins an Elasticsearch cluster.
+ *
+ * <p>
+ * This requires both the host and port properties to be set.
+ * Other settings can be configured via Elasticsearch settings in the properties of the Samza job.
+ * </p>
+ *
+ */
+public class TransportClientFactory implements ClientFactory {
+ private final Map<String, String> clientSettings;
+ private final String transportHost;
+ private final int transportPort;
+
+ public TransportClientFactory(ElasticsearchConfig config) {
+ clientSettings = config.getElasticseachSettings();
+
+ if (config.getTransportHost().isPresent()) {
+ transportHost = config.getTransportHost().get();
+ } else {
+ throw new SamzaException("You must specify the transport host for TransportClientFactory"
+ + "with the Elasticsearch system.");
+ }
+
+ if (config.getTransportPort().isPresent()) {
+ transportPort = config.getTransportPort().get();
+ } else {
+ throw new SamzaException("You must specify the transport port for TransportClientFactory"
+ + "with the Elasticsearch system.");
+ }
+ }
+
+ @Override
+ public Client getClient() {
+ Settings settings = ImmutableSettings.settingsBuilder()
+ .put(clientSettings)
+ .build();
+
+ TransportAddress address = new InetSocketTransportAddress(transportHost, transportPort);
+
+ return new TransportClient(settings).addTransportAddress(address);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa6ca0b1/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
new file mode 100644
index 0000000..afe0eee
--- /dev/null
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactory.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.elasticsearch.indexrequest;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.elasticsearch.action.index.IndexRequest;
+
+import java.util.Map;
+
+/**
+ * The default {@link IndexRequestFactory}.
+ *
+ * <p>Samza concepts are mapped to Elastic search concepts as follows:</p>
+ *
+ * <ul>
+ * <li>
+ * The index and type are derived from the stream name using
+ * the following the pattern "{index-name}/{type-name}".
+ * </li>
+ * <li>
+ * The id of the document is the {@link String} representation of the
+ * {@link OutgoingMessageEnvelope#getKey()} from {@link Object#toString()} if provided.</li>
+ * <li>
+ * The source of the document is set from the {@link OutgoingMessageEnvelope#getMessage()}.
+ * Supported types are {@link byte[]} and {@link Map} which are both
+ * passed on without serialising.
+ * </li>
+ * <li>
+ * The routing key is set from {@link String} representation of the
+ * {@link OutgoingMessageEnvelope#getPartitionKey()} from {@link Object#toString()} if provided.
+ * </li>
+ * </ul>
+ */
+public class DefaultIndexRequestFactory implements IndexRequestFactory {
+
+ @Override
+ public IndexRequest getIndexRequest(OutgoingMessageEnvelope envelope) {
+ String[] parts = envelope.getSystemStream().getStream().split("/");
+ if (parts.length != 2) {
+ throw new SamzaException("Elasticsearch stream name must match pattern {index}/{type}");
+ }
+ String index = parts[0];
+ String type = parts[1];
+ IndexRequest indexRequest = new IndexRequest(index, type);
+
+ Object id = envelope.getKey();
+ if (id != null) {
+ indexRequest.id(id.toString());
+ }
+
+ Object partitionKey = envelope.getPartitionKey();
+ if (partitionKey != null) {
+ indexRequest.routing(partitionKey.toString());
+ }
+
+ Object message = envelope.getMessage();
+ if (message instanceof byte[]) {
+ indexRequest.source((byte[]) message);
+ } else if (message instanceof Map) {
+ indexRequest.source((Map) message);
+ } else {
+ throw new SamzaException("Unsupported message type: " + message.getClass().getCanonicalName());
+ }
+
+ return indexRequest;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa6ca0b1/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/IndexRequestFactory.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/IndexRequestFactory.java b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/IndexRequestFactory.java
new file mode 100644
index 0000000..b7bd00d
--- /dev/null
+++ b/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/indexrequest/IndexRequestFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.elasticsearch.indexrequest;
+
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.elasticsearch.action.index.IndexRequest;
+
+/**
+ * A factory that creates Elasticsearch {@link IndexRequest} instances from the Samza
+ * {@link OutgoingMessageEnvelope}
+ *
+ */
+public interface IndexRequestFactory {
+
+ IndexRequest getIndexRequest(OutgoingMessageEnvelope envelope);
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa6ca0b1/samza-elasticsearch/src/test/java/org/apache/samza/config/ElasticsearchConfigTest.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/test/java/org/apache/samza/config/ElasticsearchConfigTest.java b/samza-elasticsearch/src/test/java/org/apache/samza/config/ElasticsearchConfigTest.java
new file mode 100644
index 0000000..a5861ba
--- /dev/null
+++ b/samza-elasticsearch/src/test/java/org/apache/samza/config/ElasticsearchConfigTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+public class ElasticsearchConfigTest {
+
+ private ElasticsearchConfig EMPTY_CONFIG = new ElasticsearchConfig(
+ "es",
+ new MapConfig(Collections.<String, String>emptyMap()));
+
+ private ElasticsearchConfig configForProperty(String key, String value) {
+ Map<String, String> mapConfig = new HashMap<>();
+ mapConfig.put(key, value);
+ return new ElasticsearchConfig("es", new MapConfig(mapConfig));
+ }
+
+ @Test
+ public void testGetClientFactoryClassName() throws Exception {
+ ElasticsearchConfig config = configForProperty("systems.es.client.factory", "bar");
+
+ assertEquals("bar", config.getClientFactoryClassName());
+ }
+
+ @Test
+ public void testGetTransportHost() throws Exception {
+ assertFalse(EMPTY_CONFIG.getTransportHost().isPresent());
+
+ ElasticsearchConfig config = configForProperty("systems.es.client.transport.host", "example.org");
+
+ assertTrue(config.getTransportHost().isPresent());
+ assertEquals("example.org", config.getTransportHost().get());
+ }
+
+ @Test
+ public void testGetTransportPort() throws Exception {
+ assertFalse(EMPTY_CONFIG.getTransportPort().isPresent());
+
+ ElasticsearchConfig config = configForProperty("systems.es.client.transport.port", "9300");
+
+ assertTrue(config.getTransportPort().isPresent());
+ assertEquals(Integer.valueOf(9300), config.getTransportPort().get());
+ }
+
+ @Test
+ public void testGetElasticsearchSettings() throws Exception {
+ ElasticsearchConfig config = configForProperty("systems.es.client.elasticsearch.foo", "bar");
+
+ assertEquals("bar", config.getElasticseachSettings().get("foo"));
+ }
+
+ @Test
+ public void testGetBulkFlushMaxActions() throws Exception {
+ assertFalse(EMPTY_CONFIG.getBulkFlushMaxActions().isPresent());
+
+ ElasticsearchConfig config = configForProperty("systems.es.bulk.flush.max.actions", "10");
+
+ assertEquals(Integer.valueOf(10), config.getBulkFlushMaxActions().get());
+ }
+
+ @Test
+ public void testGetBulkFlushMaxSizeMB() throws Exception {
+ assertFalse(EMPTY_CONFIG.getBulkFlushMaxSizeMB().isPresent());
+
+ ElasticsearchConfig config = configForProperty("systems.es.bulk.flush.max.size.mb", "10");
+
+ assertTrue(config.getBulkFlushMaxSizeMB().isPresent());
+ assertEquals(Integer.valueOf(10), config.getBulkFlushMaxSizeMB().get());
+ }
+
+ @Test
+ public void testGetBulkFlushIntervalMS() throws Exception {
+ assertFalse(EMPTY_CONFIG.getBulkFlushIntervalMS().isPresent());
+
+ ElasticsearchConfig config = configForProperty("systems.es.bulk.flush.interval.ms", "10");
+
+ assertTrue(config.getBulkFlushIntervalMS().isPresent());
+ assertEquals(Integer.valueOf(10), config.getBulkFlushIntervalMS().get());
+ }
+
+ @Test
+ public void testGetIndexRequestFactoryClassName() throws Exception {
+ assertFalse(EMPTY_CONFIG.getIndexRequestFactoryClassName().isPresent());
+
+ ElasticsearchConfig config = configForProperty("systems.es.index.request.factory", "foo");
+
+ assertTrue(config.getIndexRequestFactoryClassName().isPresent());
+ assertEquals("foo", config.getIndexRequestFactoryClassName().get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa6ca0b1/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java b/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
new file mode 100644
index 0000000..e63d62c
--- /dev/null
+++ b/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducerTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.elasticsearch;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.elasticsearch.indexrequest.IndexRequestFactory;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Client;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class ElasticsearchSystemProducerTest {
+ private static final String SYSTEM_NAME = "es";
+ private static final BulkProcessorFactory BULK_PROCESSOR_FACTORY = mock(BulkProcessorFactory.class);
+ private static final Client CLIENT = mock(Client.class);
+ private static final IndexRequestFactory INDEX_REQUEST_FACTORY = mock(IndexRequestFactory.class);
+ public static final String SOURCE_ONE = "one";
+ public static final String SOURCE_TWO = "two";
+ private SystemProducer producer;
+ public static BulkProcessor processorOne;
+ public static BulkProcessor processorTwo;
+
+ @Before
+ public void setUp() throws Exception {
+ producer = new ElasticsearchSystemProducer(SYSTEM_NAME,
+ BULK_PROCESSOR_FACTORY,
+ CLIENT,
+ INDEX_REQUEST_FACTORY);
+
+ processorOne = mock(BulkProcessor.class);
+ processorTwo = mock(BulkProcessor.class);
+
+ when(BULK_PROCESSOR_FACTORY.getBulkProcessor(eq(CLIENT), any(BulkProcessor.Listener.class)))
+ .thenReturn(processorOne);
+ producer.register(SOURCE_ONE);
+
+ when(BULK_PROCESSOR_FACTORY.getBulkProcessor(eq(CLIENT), any(BulkProcessor.Listener.class)))
+ .thenReturn(processorTwo);
+ producer.register(SOURCE_TWO);
+ }
+
+ @Test
+ public void testRegisterStop() throws Exception {
+ producer.stop();
+
+ verify(processorOne).flush();
+ verify(processorTwo).flush();
+
+ verify(processorOne).close();
+ verify(processorTwo).close();
+
+ verify(CLIENT).close();
+ }
+
+ @Test
+ public void testSend() throws Exception {
+ OutgoingMessageEnvelope envelope = mock(OutgoingMessageEnvelope.class);
+ IndexRequest indexRequest = mock(IndexRequest.class);
+
+ when(INDEX_REQUEST_FACTORY.getIndexRequest(envelope)).thenReturn(indexRequest);
+
+ producer.send(SOURCE_ONE, envelope);
+
+ verify(processorOne).add(indexRequest);
+ }
+
+ @Test
+ public void testFlushNoFailedSend() throws Exception {
+ producer.flush(SOURCE_ONE);
+
+ verify(processorOne).flush();
+ verify(processorTwo, never()).flush();
+ }
+
+ @Test(expected=SamzaException.class)
+ public void testFlushFailedSendFromException() throws Exception {
+ ArgumentCaptor<BulkProcessor.Listener> listenerCaptor =
+ ArgumentCaptor.forClass(BulkProcessor.Listener.class);
+
+ when(BULK_PROCESSOR_FACTORY.getBulkProcessor(eq(CLIENT), listenerCaptor.capture()))
+ .thenReturn(processorOne);
+ producer.register(SOURCE_ONE);
+
+ listenerCaptor.getValue().afterBulk(0, null, new Throwable());
+
+ producer.flush(SOURCE_ONE);
+ }
+
+ @Test(expected=SamzaException.class)
+ public void testFlushFailedSendFromFailedDocument() throws Exception {
+ ArgumentCaptor<BulkProcessor.Listener> listenerCaptor =
+ ArgumentCaptor.forClass(BulkProcessor.Listener.class);
+
+ when(BULK_PROCESSOR_FACTORY.getBulkProcessor(eq(CLIENT), listenerCaptor.capture()))
+ .thenReturn(processorOne);
+ producer.register(SOURCE_ONE);
+
+ BulkResponse response = mock(BulkResponse.class);
+ when(response.hasFailures()).thenReturn(true);
+
+ listenerCaptor.getValue().afterBulk(0, null, response);
+
+ producer.flush(SOURCE_ONE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa6ca0b1/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactoryTest.java
----------------------------------------------------------------------
diff --git a/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactoryTest.java b/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactoryTest.java
new file mode 100644
index 0000000..61c3e7d
--- /dev/null
+++ b/samza-elasticsearch/src/test/java/org/apache/samza/system/elasticsearch/indexrequest/DefaultIndexRequestFactoryTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.elasticsearch.indexrequest;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.common.base.Charsets;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class DefaultIndexRequestFactoryTest {
+
+ private static final IndexRequestFactory indexRequestFactory = new DefaultIndexRequestFactory();
+ private static final String TYPE = "type";
+ private static final String INDEX = "index";
+ private static final SystemStream SYSTEM = mock(SystemStream.class);
+ private static final Map EMPTY_MSG = Collections.emptyMap();
+
+ @Before
+ public void setup() {
+ when(SYSTEM.getStream()).thenReturn(INDEX + "/" + TYPE);
+ }
+
+ @Test
+ public void testGetIndexRequestStreamName() {
+ IndexRequest indexRequest = indexRequestFactory.
+ getIndexRequest(new OutgoingMessageEnvelope(SYSTEM, EMPTY_MSG));
+
+ assertEquals(INDEX, indexRequest.index());
+ assertEquals(TYPE, indexRequest.type());
+ }
+
+ @Test(expected=SamzaException.class)
+ public void testGetIndexRequestInvalidStreamName() {
+ when(SYSTEM.getStream()).thenReturn(INDEX);
+ indexRequestFactory.getIndexRequest(new OutgoingMessageEnvelope(SYSTEM, EMPTY_MSG));
+ }
+
+ @Test
+ public void testGetIndexRequestNoId() throws Exception {
+ IndexRequest indexRequest =
+ indexRequestFactory.getIndexRequest(new OutgoingMessageEnvelope(SYSTEM, EMPTY_MSG));
+
+ assertNull(indexRequest.id());
+ }
+
+ @Test
+ public void testGetIndexRequestWithId() throws Exception {
+ IndexRequest indexRequest =
+ indexRequestFactory.getIndexRequest(new OutgoingMessageEnvelope(SYSTEM, "id", EMPTY_MSG));
+
+ assertEquals("id", indexRequest.id());
+ }
+
+ @Test
+ public void testGetIndexRequestNoPartitionKey() throws Exception {
+ IndexRequest indexRequest = indexRequestFactory.getIndexRequest(
+ new OutgoingMessageEnvelope(SYSTEM, EMPTY_MSG));
+
+ assertNull(indexRequest.routing());
+ }
+
+ @Test
+ public void testGetIndexRequestWithPartitionKey() throws Exception {
+ IndexRequest indexRequest = indexRequestFactory.getIndexRequest(
+ new OutgoingMessageEnvelope(SYSTEM, "shardKey", "id", EMPTY_MSG));
+
+ assertEquals("shardKey", indexRequest.routing());
+ }
+
+ @Test
+ public void testGetIndexRequestMessageBytes() throws Exception {
+ IndexRequest indexRequest = indexRequestFactory.getIndexRequest(
+ new OutgoingMessageEnvelope(SYSTEM, "{\"foo\":\"bar\"}".getBytes(Charsets.UTF_8)));
+
+ assertEquals(Collections.singletonMap("foo", "bar"), indexRequest.sourceAsMap());
+ }
+
+ @Test
+ public void testGetIndexRequestMessageMap() throws Exception {
+ IndexRequest indexRequest = indexRequestFactory.getIndexRequest(
+ new OutgoingMessageEnvelope(SYSTEM, Collections.singletonMap("foo", "bar")));
+
+ assertEquals(Collections.singletonMap("foo", "bar"), indexRequest.sourceAsMap());
+ }
+
+ @Test(expected=SamzaException.class)
+ public void testGetIndexRequestInvalidMessage() throws Exception {
+ indexRequestFactory.getIndexRequest(new OutgoingMessageEnvelope(SYSTEM, "{'foo':'bar'}"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa6ca0b1/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index bb07a3b..19bff97 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -18,18 +18,26 @@
*/
include \
'samza-api',
- 'samza-core',
- 'samza-kafka',
- 'samza-kv',
- 'samza-kv-inmemory',
- 'samza-kv-rocksdb',
+ 'samza-elasticsearch',
'samza-log4j',
- 'samza-shell',
- 'samza-yarn',
- 'samza-test'
+ 'samza-shell'
+
+def scalaModules = [
+ 'samza-core',
+ 'samza-kafka',
+ 'samza-kv',
+ 'samza-kv-inmemory',
+ 'samza-kv-rocksdb',
+ 'samza-yarn',
+ 'samza-test'
+] as HashSet
+
+scalaModules.each {
+ include it
+}
rootProject.children.each {
- if (it.name != 'samza-api' && it.name != 'samza-shell' && it.name != 'samza-log4j') {
+ if (scalaModules.contains(it.name)) {
it.name = it.name + "_" + scalaVersion
}
}