You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2014/03/21 00:17:34 UTC

[1/2] FLUME-2225. Elasticsearch Sink for ES HTTP API

Repository: flume
Updated Branches:
  refs/heads/trunk 1c8b8f5e9 -> e12f0a7a4


http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimestampedEventTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimestampedEventTest.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimestampedEventTest.java
new file mode 100644
index 0000000..bef2ac6
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimestampedEventTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import com.google.common.collect.Maps;
+import org.apache.flume.event.SimpleEvent;
+import org.joda.time.DateTimeUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class TimestampedEventTest {
+  static final long FIXED_TIME_MILLIS = 123456789L;
+
+  @Before
+  public void setFixedJodaTime() {
+    DateTimeUtils.setCurrentMillisFixed(FIXED_TIME_MILLIS);
+  }
+
+  @Test
+  public void shouldEnsureTimestampHeaderPresentInTimestampedEvent() {
+    SimpleEvent base = new SimpleEvent();
+
+    TimestampedEvent timestampedEvent = new TimestampedEvent(base);
+    assertEquals(FIXED_TIME_MILLIS, timestampedEvent.getTimestamp());
+    assertEquals(String.valueOf(FIXED_TIME_MILLIS),
+            timestampedEvent.getHeaders().get("timestamp"));
+  }
+
+  @Test
+  public void shouldUseExistingTimestampHeaderInTimestampedEvent() {
+    SimpleEvent base = new SimpleEvent();
+    Map<String, String> headersWithTimestamp = Maps.newHashMap();
+    headersWithTimestamp.put("timestamp", "-321");
+    base.setHeaders(headersWithTimestamp );
+
+    TimestampedEvent timestampedEvent = new TimestampedEvent(base);
+    assertEquals(-321L, timestampedEvent.getTimestamp());
+    assertEquals("-321", timestampedEvent.getHeaders().get("timestamp"));
+  }
+
+  @Test
+  public void shouldUseExistingAtTimestampHeaderInTimestampedEvent() {
+    SimpleEvent base = new SimpleEvent();
+    Map<String, String> headersWithTimestamp = Maps.newHashMap();
+    headersWithTimestamp.put("@timestamp", "-999");
+    base.setHeaders(headersWithTimestamp );
+
+    TimestampedEvent timestampedEvent = new TimestampedEvent(base);
+    assertEquals(-999L, timestampedEvent.getTimestamp());
+    assertEquals("-999", timestampedEvent.getHeaders().get("@timestamp"));
+    assertNull(timestampedEvent.getHeaders().get("timestamp"));
+  }
+
+  @Test
+  public void shouldPreserveBodyAndNonTimestampHeadersInTimestampedEvent() {
+    SimpleEvent base = new SimpleEvent();
+    base.setBody(new byte[] {1,2,3,4});
+    Map<String, String> headersWithTimestamp = Maps.newHashMap();
+    headersWithTimestamp.put("foo", "bar");
+    base.setHeaders(headersWithTimestamp );
+
+    TimestampedEvent timestampedEvent = new TimestampedEvent(base);
+    assertEquals("bar", timestampedEvent.getHeaders().get("foo"));
+    assertArrayEquals(base.getBody(), timestampedEvent.getBody());
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/RoundRobinListTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/RoundRobinListTest.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/RoundRobinListTest.java
new file mode 100644
index 0000000..38e7399
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/RoundRobinListTest.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2014 Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.elasticsearch.client;
+
+import java.util.Arrays;
+import static org.junit.Assert.assertEquals;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RoundRobinListTest {
+
+  private RoundRobinList<String> fixture;
+
+  @Before
+  public void setUp() {
+    fixture = new RoundRobinList<String>(Arrays.asList("test1", "test2"));
+  }
+
+  @Test
+  public void shouldReturnNextElement() {
+    assertEquals("test1", fixture.get());
+    assertEquals("test2", fixture.get());
+    assertEquals("test1", fixture.get());
+    assertEquals("test2", fixture.get());
+    assertEquals("test1", fixture.get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchClientFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchClientFactory.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchClientFactory.java
new file mode 100644
index 0000000..4b70b65
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchClientFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch.client;
+
+import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertThat;
+import org.mockito.Mock;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+public class TestElasticSearchClientFactory {
+
+  ElasticSearchClientFactory factory;
+  
+  @Mock
+  ElasticSearchEventSerializer serializer;
+
+  @Before
+  public void setUp() {
+    initMocks(this);
+    factory = new ElasticSearchClientFactory();
+  }
+
+  @Test
+  public void shouldReturnTransportClient() throws Exception {
+    String[] hostNames = { "127.0.0.1" };
+    Object o = factory.getClient(ElasticSearchClientFactory.TransportClient,
+            hostNames, "test", serializer, null);
+    assertThat(o, instanceOf(ElasticSearchTransportClient.class));
+  }
+
+  @Test
+  public void shouldReturnRestClient() throws NoSuchClientTypeException {
+    String[] hostNames = { "127.0.0.1" };
+    Object o = factory.getClient(ElasticSearchClientFactory.RestClient,
+            hostNames, "test", serializer, null);
+    assertThat(o, instanceOf(ElasticSearchRestClient.class));
+  }
+
+  @Test(expected=NoSuchClientTypeException.class)
+  public void shouldThrowNoSuchClientTypeException() throws NoSuchClientTypeException {
+    String[] hostNames = {"127.0.0.1"};
+    factory.getClient("not_existing_client", hostNames, "test", null, null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchRestClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchRestClient.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchRestClient.java
new file mode 100644
index 0000000..b7d8822
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchRestClient.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch.client;
+
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer;
+import org.apache.flume.sink.elasticsearch.IndexNameBuilder;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.io.BytesStream;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+
+import java.io.IOException;
+import java.util.List;
+
+import static junit.framework.Assert.assertEquals;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.StatusLine;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.util.EntityUtils;
+import org.elasticsearch.common.bytes.BytesArray;
+import static org.mockito.Mockito.*;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+public class TestElasticSearchRestClient {
+
+  private ElasticSearchRestClient fixture;
+
+  @Mock
+  private ElasticSearchEventSerializer serializer;
+
+  @Mock
+  private IndexNameBuilder nameBuilder;
+  
+  @Mock
+  private Event event;
+
+  @Mock
+  private HttpClient httpClient;
+
+  @Mock
+  private HttpResponse httpResponse;
+
+  @Mock
+  private StatusLine httpStatus;
+
+  @Mock
+  private HttpEntity httpEntity;
+
+  private static final String INDEX_NAME = "foo_index";
+  private static final String MESSAGE_CONTENT = "{\"body\":\"test\"}";
+  private static final String[] HOSTS = {"host1", "host2"};
+
+  @Before
+  public void setUp() throws IOException {
+    initMocks(this);
+    BytesReference bytesReference = mock(BytesReference.class);
+    BytesStream bytesStream = mock(BytesStream.class);
+
+    when(nameBuilder.getIndexName(any(Event.class))).thenReturn(INDEX_NAME);
+    when(bytesReference.toBytesArray()).thenReturn(new BytesArray(MESSAGE_CONTENT));
+    when(bytesStream.bytes()).thenReturn(bytesReference);
+    when(serializer.getContentBuilder(any(Event.class))).thenReturn(bytesStream);
+    fixture = new ElasticSearchRestClient(HOSTS, serializer, httpClient);
+  }
+
+  @Test
+  public void shouldAddNewEventWithoutTTL() throws Exception {
+    ArgumentCaptor<HttpPost> argument = ArgumentCaptor.forClass(HttpPost.class);
+
+    when(httpStatus.getStatusCode()).thenReturn(HttpStatus.SC_OK);
+    when(httpResponse.getStatusLine()).thenReturn(httpStatus);
+    when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(httpResponse);
+    
+    fixture.addEvent(event, nameBuilder, "bar_type", -1);
+    fixture.execute();
+
+    verify(httpClient).execute(isA(HttpUriRequest.class));
+    verify(httpClient).execute(argument.capture());
+
+    assertEquals("http://host1/_bulk", argument.getValue().getURI().toString());
+    assertEquals("{\"index\":{\"_type\":\"bar_type\",\"_index\":\"foo_index\"}}\n" + MESSAGE_CONTENT + "\n",
+            EntityUtils.toString(argument.getValue().getEntity()));
+  }
+
+  @Test
+  public void shouldAddNewEventWithTTL() throws Exception {
+    ArgumentCaptor<HttpPost> argument = ArgumentCaptor.forClass(HttpPost.class);
+
+    when(httpStatus.getStatusCode()).thenReturn(HttpStatus.SC_OK);
+    when(httpResponse.getStatusLine()).thenReturn(httpStatus);
+    when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(httpResponse);
+
+    fixture.addEvent(event, nameBuilder, "bar_type", 123);
+    fixture.execute();
+
+    verify(httpClient).execute(isA(HttpUriRequest.class));
+    verify(httpClient).execute(argument.capture());
+
+    assertEquals("http://host1/_bulk", argument.getValue().getURI().toString());
+    assertEquals("{\"index\":{\"_type\":\"bar_type\",\"_index\":\"foo_index\",\"_ttl\":\"123\"}}\n" +
+            MESSAGE_CONTENT + "\n", EntityUtils.toString(argument.getValue().getEntity()));
+  }
+
+  @Test(expected = EventDeliveryException.class)
+  public void shouldThrowEventDeliveryException() throws Exception {
+    ArgumentCaptor<HttpPost> argument = ArgumentCaptor.forClass(HttpPost.class);
+
+    when(httpStatus.getStatusCode()).thenReturn(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+    when(httpResponse.getStatusLine()).thenReturn(httpStatus);
+    when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(httpResponse);
+
+    fixture.addEvent(event, nameBuilder, "bar_type", 123);
+    fixture.execute();
+  }
+
+  @Test()
+  public void shouldRetryBulkOperation() throws Exception {
+    ArgumentCaptor<HttpPost> argument = ArgumentCaptor.forClass(HttpPost.class);
+
+    when(httpStatus.getStatusCode()).thenReturn(HttpStatus.SC_INTERNAL_SERVER_ERROR, HttpStatus.SC_OK);
+    when(httpResponse.getStatusLine()).thenReturn(httpStatus);
+    when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(httpResponse);
+
+    fixture.addEvent(event, nameBuilder, "bar_type", 123);
+    fixture.execute();
+
+    verify(httpClient, times(2)).execute(isA(HttpUriRequest.class));
+    verify(httpClient, times(2)).execute(argument.capture());
+
+    List<HttpPost> allValues = argument.getAllValues();
+    assertEquals("http://host1/_bulk", allValues.get(0).getURI().toString());
+    assertEquals("http://host2/_bulk", allValues.get(1).getURI().toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchTransportClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchTransportClient.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchTransportClient.java
new file mode 100644
index 0000000..b7b8e74
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchTransportClient.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch.client;
+
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer;
+import org.apache.flume.sink.elasticsearch.IndexNameBuilder;
+import org.elasticsearch.action.ListenableActionFuture;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.io.BytesStream;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+
+import java.io.IOException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.*;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+public class TestElasticSearchTransportClient {
+
+  private ElasticSearchTransportClient fixture;
+
+  @Mock
+  private ElasticSearchEventSerializer serializer;
+
+  @Mock
+  private IndexNameBuilder nameBuilder;
+
+  @Mock
+  private Client elasticSearchClient;
+
+  @Mock
+  private BulkRequestBuilder bulkRequestBuilder;
+
+  @Mock
+  private IndexRequestBuilder indexRequestBuilder;
+
+  @Mock
+  private Event event;
+
+  @Before
+  public void setUp() throws IOException {
+    initMocks(this);
+    BytesReference bytesReference = mock(BytesReference.class);
+    BytesStream bytesStream = mock(BytesStream.class);
+
+    when(nameBuilder.getIndexName(any(Event.class))).thenReturn("foo_index");
+    when(bytesReference.toBytes()).thenReturn("{\"body\":\"test\"}".getBytes());
+    when(bytesStream.bytes()).thenReturn(bytesReference);
+    when(serializer.getContentBuilder(any(Event.class)))
+        .thenReturn(bytesStream);
+    when(elasticSearchClient.prepareIndex(anyString(), anyString()))
+        .thenReturn(indexRequestBuilder);
+    when(indexRequestBuilder.setSource(bytesReference)).thenReturn(
+        indexRequestBuilder);
+
+    fixture = new ElasticSearchTransportClient(elasticSearchClient, serializer);
+    fixture.setBulkRequestBuilder(bulkRequestBuilder);
+  }
+
+  @Test
+  public void shouldAddNewEventWithoutTTL() throws Exception {
+    fixture.addEvent(event, nameBuilder, "bar_type", -1);
+    verify(indexRequestBuilder).setSource(
+        serializer.getContentBuilder(event).bytes());
+    verify(bulkRequestBuilder).add(indexRequestBuilder);
+  }
+
+  @Test
+  public void shouldAddNewEventWithTTL() throws Exception {
+    fixture.addEvent(event, nameBuilder, "bar_type", 10);
+    verify(indexRequestBuilder).setTTL(10);
+    verify(indexRequestBuilder).setSource(
+        serializer.getContentBuilder(event).bytes());
+  }
+
+  @Test
+  public void shouldExecuteBulkRequestBuilder() throws Exception {
+    ListenableActionFuture<BulkResponse> action =
+        (ListenableActionFuture<BulkResponse>) mock(ListenableActionFuture.class);
+    BulkResponse response = mock(BulkResponse.class);
+    when(bulkRequestBuilder.execute()).thenReturn(action);
+    when(action.actionGet()).thenReturn(response);
+    when(response.hasFailures()).thenReturn(false);
+
+    fixture.addEvent(event, nameBuilder, "bar_type", 10);
+    fixture.execute();
+    verify(bulkRequestBuilder).execute();
+  }
+
+  @Test(expected = EventDeliveryException.class)
+  public void shouldThrowExceptionOnExecuteFailed() throws Exception {
+    ListenableActionFuture<BulkResponse> action =
+        (ListenableActionFuture<BulkResponse>) mock(ListenableActionFuture.class);
+    BulkResponse response = mock(BulkResponse.class);
+    when(bulkRequestBuilder.execute()).thenReturn(action);
+    when(action.actionGet()).thenReturn(response);
+    when(response.hasFailures()).thenReturn(true);
+
+    fixture.addEvent(event, nameBuilder, "bar_type", 10);
+    fixture.execute();
+  }
+}


[2/2] git commit: FLUME-2225. Elasticsearch Sink for ES HTTP API

Posted by hs...@apache.org.
FLUME-2225. Elasticsearch Sink for ES HTTP API

(Pawel Rog via Edward Sargisson, Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/e12f0a7a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/e12f0a7a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/e12f0a7a

Branch: refs/heads/trunk
Commit: e12f0a7a46752eff2ff3aaddfa0b93fb606074d7
Parents: 1c8b8f5
Author: Hari Shreedharan <hs...@apache.org>
Authored: Thu Mar 20 16:16:14 2014 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Thu Mar 20 16:17:26 2014 -0700

----------------------------------------------------------------------
 .../flume-ng-elasticsearch-sink/pom.xml         |  24 +-
 ...ElasticSearchIndexRequestBuilderFactory.java |  40 +--
 ...ElasticSearchIndexRequestBuilderFactory.java |  26 +-
 .../sink/elasticsearch/ElasticSearchSink.java   | 278 +++++++++----------
 .../ElasticSearchSinkConstants.java             |  29 ++
 .../sink/elasticsearch/IndexNameBuilder.java    |  42 +++
 .../elasticsearch/SimpleIndexNameBuilder.java   |  45 +++
 .../TimeBasedIndexNameBuilder.java              |  90 ++++++
 .../sink/elasticsearch/TimestampedEvent.java    |  60 ++++
 .../client/ElasticSearchClient.java             |  58 ++++
 .../client/ElasticSearchClientFactory.java      |  76 +++++
 .../client/ElasticSearchRestClient.java         | 148 ++++++++++
 .../client/ElasticSearchTransportClient.java    | 231 +++++++++++++++
 .../client/NoSuchClientTypeException.java       |  23 ++
 .../elasticsearch/client/RoundRobinList.java    |  44 +++
 .../TestElasticSearchDynamicSerializer.java     |  12 +-
 ...ElasticSearchIndexRequestBuilderFactory.java |  42 +--
 ...estElasticSearchLogStashEventSerializer.java |  14 +-
 .../elasticsearch/TestElasticSearchSink.java    | 126 ++++++---
 .../TimeBasedIndexNameBuilderTest.java          |  60 ++++
 .../elasticsearch/TimestampedEventTest.java     |  88 ++++++
 .../client/RoundRobinListTest.java              |  41 +++
 .../client/TestElasticSearchClientFactory.java  |  64 +++++
 .../client/TestElasticSearchRestClient.java     | 158 +++++++++++
 .../TestElasticSearchTransportClient.java       | 127 +++++++++
 25 files changed, 1679 insertions(+), 267 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/pom.xml b/flume-ng-sinks/flume-ng-elasticsearch-sink/pom.xml
index bdc21d1..dedb738 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/pom.xml
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/pom.xml
@@ -24,7 +24,7 @@
   <artifactId>flume-ng-elasticsearch-sink</artifactId>
   <name>Flume NG ElasticSearch Sink</name>
 
-    <build>
+  <build>
     <plugins>
       <plugin>
         <groupId>org.apache.rat</groupId>
@@ -51,9 +51,14 @@
     </dependency>
 
     <dependency>
-        <groupId>org.elasticsearch</groupId>
-        <artifactId>elasticsearch</artifactId>
-        <optional>true</optional>
+      <groupId>org.elasticsearch</groupId>
+      <artifactId>elasticsearch</artifactId>
+      <optional>true</optional>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpclient</artifactId>
     </dependency>
 
     <dependency>
@@ -78,5 +83,16 @@
       <artifactId>guava</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpclient</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java
index 6effe34..de84b95 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java
@@ -19,22 +19,17 @@
 package org.apache.flume.sink.elasticsearch;
 
 import java.io.IOException;
-import java.util.Map;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.time.FastDateFormat;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.conf.ComponentConfiguration;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.conf.ConfigurableComponent;
-import org.apache.flume.event.SimpleEvent;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.client.Client;
-import org.joda.time.DateTimeUtils;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
 
 /**
  * Abstract base class for custom implementations of
@@ -122,37 +117,4 @@ public abstract class AbstractElasticSearchIndexRequestBuilderFactory
       IndexRequestBuilder indexRequest, String indexName,
       String indexType, Event event) throws IOException;
 
-}
-
-/**
- * {@link Event} implementation that has a timestamp.
- * The timestamp is taken from (in order of precedence):<ol>
- * <li>The "timestamp" header of the base event, if present</li>
- * <li>The "@timestamp" header of the base event, if present</li>
- * <li>The current time in millis, otherwise</li>
- * </ol>
- */
-final class TimestampedEvent extends SimpleEvent {
-
-    private final long timestamp;
-
-    TimestampedEvent(Event base) {
-      setBody(base.getBody());
-      Map<String, String> headers = Maps.newHashMap(base.getHeaders());
-      String timestampString = headers.get("timestamp");
-      if (StringUtils.isBlank(timestampString)) {
-        timestampString = headers.get("@timestamp");
-      }
-      if (StringUtils.isBlank(timestampString)) {
-        this.timestamp = DateTimeUtils.currentTimeMillis();
-        headers.put("timestamp", String.valueOf(timestamp ));
-      } else {
-        this.timestamp = Long.valueOf(timestampString);
-      }
-      setHeaders(headers);
-    }
-
-    long getTimestamp() {
-        return timestamp;
-    }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java
index 8e77a1e..1ca227a 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java
@@ -18,9 +18,6 @@
  */
 package org.apache.flume.sink.elasticsearch;
 
-import java.io.IOException;
-import java.util.TimeZone;
-
 import org.apache.commons.lang.time.FastDateFormat;
 import org.apache.flume.Event;
 import org.apache.flume.conf.Configurable;
@@ -28,10 +25,13 @@ import org.apache.flume.conf.ConfigurableComponent;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.client.Client;
 
+import java.io.IOException;
+import java.util.TimeZone;
+
 /**
- * Interface for creating ElasticSearch {@link IndexRequestBuilder}
- * instances from serialized flume events. This is configurable, so any config
- * params required should be taken through this.
+ * Interface for creating ElasticSearch {@link IndexRequestBuilder} instances
+ * from serialized flume events. This is configurable, so any config params
+ * required should be taken through this.
  */
 public interface ElasticSearchIndexRequestBuilderFactory extends Configurable,
     ConfigurableComponent {
@@ -43,16 +43,18 @@ public interface ElasticSearchIndexRequestBuilderFactory extends Configurable,
    * @return prepared ElasticSearch {@link IndexRequestBuilder} instance
    * @param client
    *          ElasticSearch {@link Client} to prepare index from
- * @param indexPrefix
+   * @param indexPrefix
    *          Prefix of index name to use -- as configured on the sink
- * @param indexType
+   * @param indexType
    *          Index type to use -- as configured on the sink
- * @param event
+   * @param event
    *          Flume event to serialize and add to index request
    * @throws IOException
    *           If an error occurs e.g. during serialization
-  */
-  IndexRequestBuilder createIndexRequest(Client client,
-      String indexPrefix, String indexType, Event event) throws IOException;
+   */
+  IndexRequestBuilder createIndexRequest(Client client, String indexPrefix,
+      String indexType, Event event) throws IOException;
+
+
 
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
index e38ab19..39b6db5 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
@@ -23,7 +23,6 @@ import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLU
 import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_CLUSTER_NAME;
 import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_INDEX_NAME;
 import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_INDEX_TYPE;
-import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_PORT;
 import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_TTL;
 import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.HOSTNAMES;
 import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME;
@@ -32,12 +31,6 @@ import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.SER
 import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.SERIALIZER_PREFIX;
 import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL;
 import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL_REGEX;
-
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
@@ -48,16 +41,8 @@ import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.flume.sink.AbstractSink;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.node.NodeBuilder;
+import org.apache.flume.sink.elasticsearch.client.ElasticSearchClient;
+import org.apache.flume.sink.elasticsearch.client.ElasticSearchClientFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,22 +50,35 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLIENT_PREFIX;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLIENT_TYPE;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_CLIENT_TYPE;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_INDEX_NAME_BUILDER_CLASS;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_SERIALIZER_CLASS;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME_BUILDER;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME_BUILDER_PREFIX;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+
 /**
  * A sink which reads events from a channel and writes them to ElasticSearch
  * based on the work done by https://github.com/Aconex/elasticflume.git.</p>
- *
+ * 
  * This sink supports batch reading of events from the channel and writing them
  * to ElasticSearch.</p>
- *
+ * 
  * Indexes will be rolled daily using the format 'indexname-YYYY-MM-dd' to allow
  * easier management of the index</p>
- *
+ * 
  * This sink must be configured with with mandatory parameters detailed in
- * {@link ElasticSearchSinkConstants}</p>
- * It is recommended as a secondary step the ElasticSearch indexes are optimized
- * for the specified serializer. This is not handled by the sink but is
- * typically done by deploying a config template alongside the ElasticSearch
- * deploy</p>
+ * {@link ElasticSearchSinkConstants}</p> It is recommended as a secondary step
+ * the ElasticSearch indexes are optimized for the specified serializer. This is
+ * not handled by the sink but is typically done by deploying a config template
+ * alongside the ElasticSearch deploy</p>
+ * 
  * @see http
  *      ://www.elasticsearch.org/guide/reference/api/admin-indices-templates.
  *      html
@@ -101,15 +99,19 @@ public class ElasticSearchSink extends AbstractSink implements Configurable {
   private String clusterName = DEFAULT_CLUSTER_NAME;
   private String indexName = DEFAULT_INDEX_NAME;
   private String indexType = DEFAULT_INDEX_TYPE;
-  private final Pattern pattern
-    = Pattern.compile(TTL_REGEX, Pattern.CASE_INSENSITIVE);
+  private String clientType = DEFAULT_CLIENT_TYPE;
+  private final Pattern pattern = Pattern.compile(TTL_REGEX,
+      Pattern.CASE_INSENSITIVE);
   private Matcher matcher = pattern.matcher("");
 
-  private InetSocketTransportAddress[] serverAddresses;
+  private String[] serverAddresses = null;
+
+  private ElasticSearchClient client = null;
+  private Context elasticSearchClientContext = null;
 
-  private Node node;
-  private Client client;
   private ElasticSearchIndexRequestBuilderFactory indexRequestFactory;
+  private ElasticSearchEventSerializer eventSerializer;
+  private IndexNameBuilder indexNameBuilder;
   private SinkCounter sinkCounter;
 
   /**
@@ -122,12 +124,12 @@ public class ElasticSearchSink extends AbstractSink implements Configurable {
 
   /**
    * Create an {@link ElasticSearchSink}</p>
-   *
+   * 
    * @param isLocal
    *          If <tt>true</tt> sink will be configured to only talk to an
    *          ElasticSearch instance hosted in the same JVM, should always be
    *          false is production
-   *
+   * 
    */
   @VisibleForTesting
   ElasticSearchSink(boolean isLocal) {
@@ -135,7 +137,7 @@ public class ElasticSearchSink extends AbstractSink implements Configurable {
   }
 
   @VisibleForTesting
-  InetSocketTransportAddress[] getServerAddresses() {
+  String[] getServerAddresses() {
     return serverAddresses;
   }
 
@@ -159,6 +161,16 @@ public class ElasticSearchSink extends AbstractSink implements Configurable {
     return ttlMs;
   }
 
+  @VisibleForTesting
+  ElasticSearchEventSerializer getEventSerializer() {
+    return eventSerializer;
+  }
+
+  @VisibleForTesting
+  IndexNameBuilder getIndexNameBuilder() {
+    return indexNameBuilder;
+  }
+
   @Override
   public Status process() throws EventDeliveryException {
     logger.debug("processing...");
@@ -167,47 +179,33 @@ public class ElasticSearchSink extends AbstractSink implements Configurable {
     Transaction txn = channel.getTransaction();
     try {
       txn.begin();
-      BulkRequestBuilder bulkRequest = client.prepareBulk();
-      for (int i = 0; i < batchSize; i++) {
+      int count;
+      for (count = 0; count < batchSize; ++count) {
         Event event = channel.take();
 
         if (event == null) {
           break;
         }
-
-        IndexRequestBuilder indexRequest =
-            indexRequestFactory.createIndexRequest(
-                client, indexName, indexType, event);
-
-        if (ttlMs > 0) {
-          indexRequest.setTTL(ttlMs);
-        }
-
-        bulkRequest.add(indexRequest);
+        client.addEvent(event, indexNameBuilder, indexType, ttlMs);
       }
 
-      int size = bulkRequest.numberOfActions();
-      if (size <= 0) {
+      if (count <= 0) {
         sinkCounter.incrementBatchEmptyCount();
         counterGroup.incrementAndGet("channel.underflow");
         status = Status.BACKOFF;
       } else {
-        if (size < batchSize) {
+        if (count < batchSize) {
           sinkCounter.incrementBatchUnderflowCount();
           status = Status.BACKOFF;
         } else {
           sinkCounter.incrementBatchCompleteCount();
         }
 
-        sinkCounter.addToEventDrainAttemptCount(size);
-
-        BulkResponse bulkResponse = bulkRequest.execute().actionGet();
-        if (bulkResponse.hasFailures()) {
-          throw new EventDeliveryException(bulkResponse.buildFailureMessage());
-        }
+        sinkCounter.addToEventDrainAttemptCount(count);
+        client.execute();
       }
       txn.commit();
-      sinkCounter.addToEventDrainSuccessCount(size);
+      sinkCounter.addToEventDrainSuccessCount(count);
       counterGroup.incrementAndGet("transaction.success");
     } catch (Throwable ex) {
       try {
@@ -238,22 +236,10 @@ public class ElasticSearchSink extends AbstractSink implements Configurable {
   @Override
   public void configure(Context context) {
     if (!isLocal) {
-      String[] hostNames = null;
       if (StringUtils.isNotBlank(context.getString(HOSTNAMES))) {
-        hostNames = context.getString(HOSTNAMES).split(",");
-      }
-      Preconditions.checkState(hostNames != null && hostNames.length > 0,
-          "Missing Param:" + HOSTNAMES);
-
-      serverAddresses = new InetSocketTransportAddress[hostNames.length];
-      for (int i = 0; i < hostNames.length; i++) {
-        String[] hostPort = hostNames[i].trim().split(":");
-        String host = hostPort[0].trim();
-        int port = hostPort.length == 2 ? Integer.parseInt(hostPort[1].trim())
-            : DEFAULT_PORT;
-        serverAddresses[i] = new InetSocketTransportAddress(host, port);
+        serverAddresses = StringUtils.deleteWhitespace(
+            context.getString(HOSTNAMES)).split(",");
       }
-
       Preconditions.checkState(serverAddresses != null
           && serverAddresses.length > 0, "Missing Param:" + HOSTNAMES);
     }
@@ -280,7 +266,14 @@ public class ElasticSearchSink extends AbstractSink implements Configurable {
           + " must be greater than 0 or not set.");
     }
 
-    String serializerClazz = "org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer";
+    if (StringUtils.isNotBlank(context.getString(CLIENT_TYPE))) {
+      clientType = context.getString(CLIENT_TYPE);
+    }
+
+    elasticSearchClientContext = new Context();
+    elasticSearchClientContext.putAll(context.getSubProperties(CLIENT_PREFIX));
+
+    String serializerClazz = DEFAULT_SERIALIZER_CLASS;
     if (StringUtils.isNotBlank(context.getString(SERIALIZER))) {
       serializerClazz = context.getString(SERIALIZER);
     }
@@ -293,17 +286,18 @@ public class ElasticSearchSink extends AbstractSink implements Configurable {
       Class<? extends Configurable> clazz = (Class<? extends Configurable>) Class
           .forName(serializerClazz);
       Configurable serializer = clazz.newInstance();
+
       if (serializer instanceof ElasticSearchIndexRequestBuilderFactory) {
-        indexRequestFactory = (ElasticSearchIndexRequestBuilderFactory) serializer;
-      } else if (serializer instanceof ElasticSearchEventSerializer){
-        indexRequestFactory = new EventSerializerIndexRequestBuilderFactory(
-            (ElasticSearchEventSerializer) serializer);
+        indexRequestFactory
+            = (ElasticSearchIndexRequestBuilderFactory) serializer;
+        indexRequestFactory.configure(serializerContext);
+      } else if (serializer instanceof ElasticSearchEventSerializer) {
+        eventSerializer = (ElasticSearchEventSerializer) serializer;
+        eventSerializer.configure(serializerContext);
       } else {
-          throw new IllegalArgumentException(
-              serializerClazz + " is neither an ElasticSearchEventSerializer"
-              + " nor an ElasticSearchIndexRequestBuilderFactory.");
+        throw new IllegalArgumentException(serializerClazz
+            + " is not an ElasticSearchEventSerializer");
       }
-      indexRequestFactory.configure(serializerContext);
     } catch (Exception e) {
       logger.error("Could not instantiate event serializer.", e);
       Throwables.propagate(e);
@@ -313,6 +307,32 @@ public class ElasticSearchSink extends AbstractSink implements Configurable {
       sinkCounter = new SinkCounter(getName());
     }
 
+    String indexNameBuilderClass = DEFAULT_INDEX_NAME_BUILDER_CLASS;
+    if (StringUtils.isNotBlank(context.getString(INDEX_NAME_BUILDER))) {
+      indexNameBuilderClass = context.getString(INDEX_NAME_BUILDER);
+    }
+
+    Context indexnameBuilderContext = new Context();
+    serializerContext.putAll(
+            context.getSubProperties(INDEX_NAME_BUILDER_PREFIX));
+
+    try {
+      @SuppressWarnings("unchecked")
+      Class<? extends IndexNameBuilder> clazz
+              = (Class<? extends IndexNameBuilder>) Class
+              .forName(indexNameBuilderClass);
+      indexNameBuilder = clazz.newInstance();
+      indexnameBuilderContext.put(INDEX_NAME, indexName);
+      indexNameBuilder.configure(indexnameBuilderContext);
+    } catch (Exception e) {
+      logger.error("Could not instantiate index name builder.", e);
+      Throwables.propagate(e);
+    }
+
+    if (sinkCounter == null) {
+      sinkCounter = new SinkCounter(getName());
+    }
+
     Preconditions.checkState(StringUtils.isNotBlank(indexName),
         "Missing Param:" + INDEX_NAME);
     Preconditions.checkState(StringUtils.isNotBlank(indexType),
@@ -325,13 +345,27 @@ public class ElasticSearchSink extends AbstractSink implements Configurable {
 
   @Override
   public void start() {
+    ElasticSearchClientFactory clientFactory = new ElasticSearchClientFactory();
+
     logger.info("ElasticSearch sink {} started");
     sinkCounter.start();
     try {
-      openConnection();
+      if (isLocal) {
+        client = clientFactory.getLocalClient(
+            clientType, eventSerializer, indexRequestFactory);
+      } else {
+        client = clientFactory.getClient(clientType, serverAddresses,
+            clusterName, eventSerializer, indexRequestFactory);
+        client.configure(elasticSearchClientContext);
+      }
+      sinkCounter.incrementConnectionCreatedCount();
     } catch (Exception ex) {
+      ex.printStackTrace();
       sinkCounter.incrementConnectionFailedCount();
-      closeConnection();
+      if (client != null) {
+        client.close();
+        sinkCounter.incrementConnectionClosedCount();
+      }
     }
 
     super.start();
@@ -340,39 +374,31 @@ public class ElasticSearchSink extends AbstractSink implements Configurable {
   @Override
   public void stop() {
     logger.info("ElasticSearch sink {} stopping");
-    closeConnection();
-
+    if (client != null) {
+      client.close();
+    }
+    sinkCounter.incrementConnectionClosedCount();
     sinkCounter.stop();
     super.stop();
   }
 
-  private void openConnection() {
-    if (isLocal) {
-      logger.info("Using ElasticSearch AutoDiscovery mode");
-      openLocalDiscoveryClient();
-    } else {
-      logger.info("Using ElasticSearch hostnames: {} ",
-          Arrays.toString(serverAddresses));
-      openClient();
-    }
-    sinkCounter.incrementConnectionCreatedCount();
-  }
-
   /*
-   * Returns TTL value of ElasticSearch index in milliseconds
-   * when TTL specifier is "ms" / "s" / "m" / "h" / "d" / "w".
-   * In case of unknown specifier TTL is not set. When specifier
-   * is not provided it defaults to days in milliseconds where the number
-   * of days is parsed integer from TTL string provided by user.
-   * <p>
-   *     Elasticsearch supports ttl values being provided in the format: 1d / 1w / 1ms / 1s / 1h / 1m
-   *     specify a time unit like d (days), m (minutes), h (hours), ms (milliseconds) or w (weeks),
-   *     milliseconds is used as default unit.
-   *     http://www.elasticsearch.org/guide/reference/mapping/ttl-field/.
-   * @param   ttl TTL value provided by user in flume configuration file for the sink
-   * @return  the ttl value in milliseconds
-  */
-  private long parseTTL(String ttl){
+   * Returns TTL value of ElasticSearch index in milliseconds when TTL specifier
+   * is "ms" / "s" / "m" / "h" / "d" / "w". In case of unknown specifier TTL is
+   * not set. When specifier is not provided it defaults to days in milliseconds
+   * where the number of days is parsed integer from TTL string provided by
+   * user. <p> Elasticsearch supports ttl values being provided in the format:
+   * 1d / 1w / 1ms / 1s / 1h / 1m specify a time unit like d (days), m
+   * (minutes), h (hours), ms (milliseconds) or w (weeks), milliseconds is used
+   * as default unit.
+   * http://www.elasticsearch.org/guide/reference/mapping/ttl-field/.
+   * 
+   * @param ttl TTL value provided by user in flume configuration file for the
+   * sink
+   * 
+   * @return the ttl value in milliseconds
+   */
+  private long parseTTL(String ttl) {
     matcher = matcher.reset(ttl);
     while (matcher.find()) {
       if (matcher.group(2).equals("ms")) {
@@ -398,40 +424,4 @@ public class ElasticSearchSink extends AbstractSink implements Configurable {
     logger.info("TTL not provided. Skipping the TTL config by returning 0.");
     return 0;
   }
-
-  /*
-   * FOR TESTING ONLY...
-   *
-   * Opens a local discovery node for talking to an elasticsearch server running
-   * in the same JVM
-   */
-  private void openLocalDiscoveryClient() {
-    node = NodeBuilder.nodeBuilder().client(true).local(true).node();
-    client = node.client();
-  }
-
-  private void openClient() {
-    Settings settings = ImmutableSettings.settingsBuilder()
-        .put("cluster.name", clusterName).build();
-
-    TransportClient transport = new TransportClient(settings);
-    for (InetSocketTransportAddress host : serverAddresses) {
-      transport.addTransportAddress(host);
-    }
-    client = transport;
-  }
-
-  private void closeConnection() {
-    if (client != null) {
-      client.close();
-    }
-    client = null;
-
-    if (node != null) {
-      node.close();
-    }
-    node = null;
-
-    sinkCounter.incrementConnectionClosedCount();
-  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java
index dd0c59d..da88def 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java
@@ -70,6 +70,30 @@ public class ElasticSearchSinkConstants {
   public static final String SERIALIZER_PREFIX = SERIALIZER + ".";
 
   /**
+   * The fully qualified class name of the index name builder the sink
+   * should use to determine name of index where the event should be sent.
+   */
+  public static final String INDEX_NAME_BUILDER = "indexNameBuilder";
+
+  /**
+   * The fully qualified class name of the index name builder the sink
+   * should use to determine name of index where the event should be sent.
+   */
+  public static final String INDEX_NAME_BUILDER_PREFIX
+          = INDEX_NAME_BUILDER + ".";
+
+  /**
+   * The client type used for sending bulks to ElasticSearch
+   */
+  public static final String CLIENT_TYPE = "client";
+
+  /**
+   * The client prefix to extract the configuration that will be passed to
+   * elasticsearch client.
+   */
+  public static final String CLIENT_PREFIX = CLIENT_TYPE + ".";
+
+  /**
    * DEFAULTS USED BY THE SINK
    */
 
@@ -78,5 +102,10 @@ public class ElasticSearchSinkConstants {
   public static final String DEFAULT_INDEX_NAME = "flume";
   public static final String DEFAULT_INDEX_TYPE = "log";
   public static final String DEFAULT_CLUSTER_NAME = "elasticsearch";
+  public static final String DEFAULT_CLIENT_TYPE = "transport";
   public static final String TTL_REGEX = "^(\\d+)(\\D*)";
+  public static final String DEFAULT_SERIALIZER_CLASS = "org.apache.flume." +
+          "sink.elasticsearch.ElasticSearchLogStashEventSerializer";
+  public static final String DEFAULT_INDEX_NAME_BUILDER_CLASS =
+          "org.apache.flume.sink.elasticsearch.TimeBasedIndexNameBuilder";
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/IndexNameBuilder.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/IndexNameBuilder.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/IndexNameBuilder.java
new file mode 100644
index 0000000..1dd4415
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/IndexNameBuilder.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import org.apache.flume.Event;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.ConfigurableComponent;
+
+public interface IndexNameBuilder extends Configurable,
+        ConfigurableComponent {
+  /**
+   * Gets the name of the index to use for an index request
+   * @param event
+   *          Event which determines index name
+   * @return index name of the form 'indexPrefix-indexDynamicName'
+   */
+  public String getIndexName(Event event);
+  
+  /**
+   * Gets the prefix of index to use for an index request.
+   * @param event
+   *          Event which determines index name
+   * @return Index prefix name
+   */
+  public String getIndexPrefix(Event event);
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/SimpleIndexNameBuilder.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/SimpleIndexNameBuilder.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/SimpleIndexNameBuilder.java
new file mode 100644
index 0000000..19079af
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/SimpleIndexNameBuilder.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2014 Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.elasticsearch;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.conf.ComponentConfiguration;
+
+public class SimpleIndexNameBuilder implements IndexNameBuilder {
+
+  private String indexName;
+
+  @Override
+  public String getIndexName(Event event) {
+    return indexName;
+  }
+
+  @Override
+  public String getIndexPrefix(Event event) {
+    return indexName;
+  }
+
+  @Override
+  public void configure(Context context) {
+    indexName = context.getString(ElasticSearchSinkConstants.INDEX_NAME);
+  }
+
+  @Override
+  public void configure(ComponentConfiguration conf) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilder.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilder.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilder.java
new file mode 100644
index 0000000..a8603a4
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilder.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.time.FastDateFormat;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.conf.ComponentConfiguration;
+
+import java.util.TimeZone;
+
+/**
+ * Default index name builder. It prepares name of index using configured
+ * prefix and current timestamp. Default format of name is prefix-yyyy-MM-dd".
+ */
+public class TimeBasedIndexNameBuilder implements
+        IndexNameBuilder {
+
+  public static final String DATE_FORMAT = "dateFormat";
+  public static final String TIME_ZONE = "timeZone";
+
+  public static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd";
+  public static final String DEFAULT_TIME_ZONE = "Etc/UTC";
+
+  private FastDateFormat fastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd",
+      TimeZone.getTimeZone("Etc/UTC"));
+
+  private String indexPrefix;
+
+  @VisibleForTesting
+  FastDateFormat getFastDateFormat() {
+    return fastDateFormat;
+  }
+
+  /**
+   * Gets the name of the index to use for an index request
+   * @param event
+   *          Event for which the name of index has to be prepared
+   * @return index name of the form 'indexPrefix-formattedTimestamp'
+   */
+  @Override
+  public String getIndexName(Event event) {
+    TimestampedEvent timestampedEvent = new TimestampedEvent(event);
+    long timestamp = timestampedEvent.getTimestamp();
+    return new StringBuilder(indexPrefix).append('-')
+      .append(fastDateFormat.format(timestamp)).toString();
+  }
+  
+  @Override
+  public String getIndexPrefix(Event event) {
+    return indexPrefix;
+  }
+
+  @Override
+  public void configure(Context context) {
+    String dateFormatString = context.getString(DATE_FORMAT);
+    String timeZoneString = context.getString(TIME_ZONE);
+    if (StringUtils.isBlank(dateFormatString)) {
+      dateFormatString = DEFAULT_DATE_FORMAT;
+    }
+    if (StringUtils.isBlank(timeZoneString)) {
+      timeZoneString = DEFAULT_TIME_ZONE;
+    }
+    fastDateFormat = FastDateFormat.getInstance(dateFormatString,
+        TimeZone.getTimeZone(timeZoneString));
+    indexPrefix = context.getString(ElasticSearchSinkConstants.INDEX_NAME);
+  }
+
+  @Override
+  public void configure(ComponentConfiguration conf) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimestampedEvent.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimestampedEvent.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimestampedEvent.java
new file mode 100644
index 0000000..c056839
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimestampedEvent.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.Event;
+import org.apache.flume.event.SimpleEvent;
+import org.joda.time.DateTimeUtils;
+
+import java.util.Map;
+
+/**
+ * {@link org.apache.flume.Event} implementation that has a timestamp.
+ * The timestamp is taken from (in order of precedence):<ol>
+ * <li>The "timestamp" header of the base event, if present</li>
+ * <li>The "@timestamp" header of the base event, if present</li>
+ * <li>The current time in millis, otherwise</li>
+ * </ol>
+ */
+final class TimestampedEvent extends SimpleEvent {
+
+  private final long timestamp;
+
+  TimestampedEvent(Event base) {
+    setBody(base.getBody());
+    Map<String, String> headers = Maps.newHashMap(base.getHeaders());
+    String timestampString = headers.get("timestamp");
+    if (StringUtils.isBlank(timestampString)) {
+      timestampString = headers.get("@timestamp");
+    }
+    if (StringUtils.isBlank(timestampString)) {
+      this.timestamp = DateTimeUtils.currentTimeMillis();
+      headers.put("timestamp", String.valueOf(timestamp ));
+    } else {
+      this.timestamp = Long.valueOf(timestampString);
+    }
+    setHeaders(headers);
+  }
+
+  long getTimestamp() {
+    return timestamp;
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClient.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClient.java
new file mode 100644
index 0000000..655e00a
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClient.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch.client;
+
+import org.apache.flume.Event;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.sink.elasticsearch.IndexNameBuilder;
+
+/**
+ * Interface for an ElasticSearch client which is responsible for sending bulks
+ * of events to ElasticSearch.
+ */
+public interface ElasticSearchClient extends Configurable {
+
+  /**
+   * Close connection to elastic search in client
+   */
+  void close();
+
+  /**
+   * Add new event to the bulk
+   *
+   * @param event
+   *    Flume Event
+   * @param indexNameBuilder
+   *    Index name builder which generates name of index to feed
+   * @param indexType
+   *    Name of type of document which will be sent to the elasticsearch cluster
+   * @param ttlMs
+   *    Time to live expressed in milliseconds. Value <= 0 is ignored
+   * @throws Exception
+   */
+  public void addEvent(Event event, IndexNameBuilder indexNameBuilder,
+      String indexType, long ttlMs) throws Exception;
+
+  /**
+   * Sends bulk to the elasticsearch cluster
+   *
+   * @throws Exception
+   */
+  void execute() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClientFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClientFactory.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClientFactory.java
new file mode 100644
index 0000000..873157a
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClientFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch.client;
+
+import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer;
+import org.apache.flume.sink.elasticsearch.ElasticSearchIndexRequestBuilderFactory;
+
+/**
+ * Internal ElasticSearch client factory. Responsible for creating instance
+ * of ElasticSearch clients.
+ */
+public class ElasticSearchClientFactory {
+  public static final String TransportClient = "transport";
+  public static final String RestClient = "rest";
+
+  /**
+   *
+   * @param clientType
+   *    String representation of client type
+   * @param hostNames
+   *    Array of strings that represents hosntames with ports (hostname:port)
+   * @param clusterName
+   *    Elasticsearch cluster name used only by Transport Client
+   * @param serializer
+   *    Serializer of flume events to elasticsearch documents
+   * @return
+   */
+  public ElasticSearchClient getClient(String clientType, String[] hostNames,
+      String clusterName, ElasticSearchEventSerializer serializer,
+      ElasticSearchIndexRequestBuilderFactory indexBuilder) throws NoSuchClientTypeException {
+    if (clientType.equalsIgnoreCase(TransportClient) && serializer != null) {
+      return new ElasticSearchTransportClient(hostNames, clusterName, serializer);
+    } else if (clientType.equalsIgnoreCase(TransportClient) && indexBuilder != null) { 
+      return new ElasticSearchTransportClient(hostNames, clusterName, indexBuilder);
+    } else if (clientType.equalsIgnoreCase(RestClient) && serializer != null) {
+      return new ElasticSearchRestClient(hostNames, serializer);
+    }
+    throw new NoSuchClientTypeException();
+  }
+
+  /**
+   * Used for tests only. Creates local elasticsearch instance client.
+   *
+   * @param clientType Name of client to use
+   * @param serializer Serializer for the event
+   * @param indexBuilder Index builder factory
+   *
+   * @return Local elastic search instance client
+   */
+  public ElasticSearchClient getLocalClient(String clientType, ElasticSearchEventSerializer serializer,
+          ElasticSearchIndexRequestBuilderFactory indexBuilder) throws NoSuchClientTypeException {
+    if (clientType.equalsIgnoreCase(TransportClient) && serializer != null) {
+      return new ElasticSearchTransportClient(serializer);
+    } else if (clientType.equalsIgnoreCase(TransportClient) && indexBuilder != null)  {
+      return new ElasticSearchTransportClient(indexBuilder);
+    } else if (clientType.equalsIgnoreCase(RestClient)) {
+    }
+    throw new NoSuchClientTypeException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java
new file mode 100644
index 0000000..ff95e30
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch.client;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.gson.Gson;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer;
+import org.apache.flume.sink.elasticsearch.IndexNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.util.EntityUtils;
+import org.elasticsearch.common.bytes.BytesReference;
+
+/**
+ * Rest ElasticSearch client which is responsible for sending bulks of events to
+ * ElasticSearch using ElasticSearch HTTP API. This is configurable, so any
+ * config params required should be taken through this.
+ */
+public class  ElasticSearchRestClient implements ElasticSearchClient {
+
+  private static final String INDEX_OPERATION_NAME = "index";
+  private static final String INDEX_PARAM = "_index";
+  private static final String TYPE_PARAM = "_type";
+  private static final String TTL_PARAM = "_ttl";
+  private static final String BULK_ENDPOINT = "_bulk";
+
+  private static final Logger logger = LoggerFactory.getLogger(ElasticSearchRestClient.class);
+
+  private final ElasticSearchEventSerializer serializer;
+  private final RoundRobinList<String> serversList;
+  
+  private StringBuilder bulkBuilder;
+  private HttpClient httpClient;
+  
+  public ElasticSearchRestClient(String[] hostNames,
+      ElasticSearchEventSerializer serializer) {
+
+    for (int i = 0; i < hostNames.length; ++i) {
+      if (!hostNames[i].contains("http://") && !hostNames[i].contains("https://")) {
+        hostNames[i] = "http://" + hostNames[i];
+      }
+    }
+    this.serializer = serializer;
+
+    serversList = new RoundRobinList<String>(Arrays.asList(hostNames));
+    httpClient = new DefaultHttpClient();
+    bulkBuilder = new StringBuilder();
+  }
+
+  @VisibleForTesting
+  public ElasticSearchRestClient(String[] hostNames,
+          ElasticSearchEventSerializer serializer, HttpClient client) {
+    this(hostNames, serializer);
+    httpClient = client;
+  }
+
+  @Override
+  public void configure(Context context) {
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public void addEvent(Event event, IndexNameBuilder indexNameBuilder, String indexType, long ttlMs) throws Exception {
+    BytesReference content = serializer.getContentBuilder(event).bytes();
+    Map<String, Map<String, String>> parameters = new HashMap<String, Map<String, String>>();
+    Map<String, String> indexParameters = new HashMap<String, String>();
+    indexParameters.put(INDEX_PARAM, indexNameBuilder.getIndexName(event));
+    indexParameters.put(TYPE_PARAM, indexType);
+    if (ttlMs > 0) {
+      indexParameters.put(TTL_PARAM, Long.toString(ttlMs));
+    }
+    parameters.put(INDEX_OPERATION_NAME, indexParameters);
+
+    Gson gson = new Gson();
+    synchronized(bulkBuilder) {
+      bulkBuilder.append(gson.toJson(parameters));
+      bulkBuilder.append("\n");
+      bulkBuilder.append(content.toBytesArray().toUtf8());
+      bulkBuilder.append("\n");
+    }
+  }
+
+  @Override
+  public void execute() throws Exception {
+    int statusCode = 0, triesCount = 0;
+    HttpResponse response = null;
+    logger.info("Sending bulk request to elasticsearch cluster");
+
+    String entity;
+    synchronized (bulkBuilder) {
+      entity = bulkBuilder.toString();
+      bulkBuilder = new StringBuilder();
+    }
+
+    while (statusCode != HttpStatus.SC_OK && triesCount < serversList.size()) {
+      triesCount++;
+      String host = serversList.get();
+      String url = host + "/" + BULK_ENDPOINT;
+      HttpPost httpRequest = new HttpPost(url);
+      httpRequest.setEntity(new StringEntity(entity));
+      response = httpClient.execute(httpRequest);
+      statusCode = response.getStatusLine().getStatusCode();
+      logger.info("Status code from elasticsearch: " + statusCode);
+      if (response.getEntity() != null)
+        logger.debug("Status message from elasticsearch: " + EntityUtils.toString(response.getEntity(), "UTF-8"));
+    }
+
+    if (statusCode != HttpStatus.SC_OK) {
+      if (response.getEntity() != null) {
+        throw new EventDeliveryException(EntityUtils.toString(response.getEntity(), "UTF-8"));
+      } else {
+        throw new EventDeliveryException("Elasticsearch status code was: " + statusCode);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchTransportClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchTransportClient.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchTransportClient.java
new file mode 100644
index 0000000..e9ed0b4
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchTransportClient.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch.client;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer;
+import org.apache.flume.sink.elasticsearch.IndexNameBuilder;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.node.NodeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import org.apache.flume.sink.elasticsearch.ElasticSearchIndexRequestBuilderFactory;
+
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_PORT;
+
+public class ElasticSearchTransportClient implements ElasticSearchClient {
+
+  public static final Logger logger = LoggerFactory
+      .getLogger(ElasticSearchTransportClient.class);
+
+  private InetSocketTransportAddress[] serverAddresses;
+  private ElasticSearchEventSerializer serializer;
+  private ElasticSearchIndexRequestBuilderFactory indexRequestBuilderFactory;
+  private BulkRequestBuilder bulkRequestBuilder;
+
+  private Client client;
+
+  @VisibleForTesting
+  InetSocketTransportAddress[] getServerAddresses() {
+    return serverAddresses;
+  }
+
+  @VisibleForTesting
+  void setBulkRequestBuilder(BulkRequestBuilder bulkRequestBuilder) {
+    this.bulkRequestBuilder = bulkRequestBuilder;
+  }
+
+  /**
+   * Transport client for external cluster
+   * 
+   * @param hostNames
+   * @param clusterName
+   * @param serializer
+   */
+  public ElasticSearchTransportClient(String[] hostNames, String clusterName,
+      ElasticSearchEventSerializer serializer) {
+    configureHostnames(hostNames);
+    this.serializer = serializer;
+    openClient(clusterName);
+  }
+
+  public ElasticSearchTransportClient(String[] hostNames, String clusterName,
+      ElasticSearchIndexRequestBuilderFactory indexBuilder) {
+    configureHostnames(hostNames);
+    this.indexRequestBuilderFactory = indexBuilder;
+    openClient(clusterName);
+  }
+  
+  /**
+   * Local transport client only for testing
+   * 
+   * @param indexBuilderFactory
+   */
+  public ElasticSearchTransportClient(ElasticSearchIndexRequestBuilderFactory indexBuilderFactory) {
+    this.indexRequestBuilderFactory = indexBuilderFactory;
+    openLocalDiscoveryClient();
+  }
+  
+  /**
+   * Local transport client only for testing
+   *
+   * @param serializer
+   */
+  public ElasticSearchTransportClient(ElasticSearchEventSerializer serializer) {
+    this.serializer = serializer;
+    openLocalDiscoveryClient();
+  }
+
+  /**
+   * Used for testing
+   *
+   * @param client
+   *    ElasticSearch Client
+   * @param serializer
+   *    Event Serializer
+   */
+  public ElasticSearchTransportClient(Client client,
+      ElasticSearchEventSerializer serializer) {
+    this.client = client;
+    this.serializer = serializer;
+  }
+
+  /**
+   * Used for testing
+   *
+   * @param client ElasticSearch Client
+   * @param serializer Event Serializer
+   */
+  public ElasticSearchTransportClient(Client client,
+            ElasticSearchIndexRequestBuilderFactory requestBuilderFactory) throws IOException {
+    this.client = client;
+    requestBuilderFactory.createIndexRequest(client, null, null, null);
+  }
+
+  private void configureHostnames(String[] hostNames) {
+    logger.warn(Arrays.toString(hostNames));
+    serverAddresses = new InetSocketTransportAddress[hostNames.length];
+    for (int i = 0; i < hostNames.length; i++) {
+      String[] hostPort = hostNames[i].trim().split(":");
+      String host = hostPort[0].trim();
+      int port = hostPort.length == 2 ? Integer.parseInt(hostPort[1].trim())
+              : DEFAULT_PORT;
+      serverAddresses[i] = new InetSocketTransportAddress(host, port);
+    }
+  }
+  
+  @Override
+  public void close() {
+    if (client != null) {
+      client.close();
+    }
+    client = null;
+  }
+
+  @Override
+  public void addEvent(Event event, IndexNameBuilder indexNameBuilder,
+      String indexType, long ttlMs) throws Exception {
+    if (bulkRequestBuilder == null) {
+      bulkRequestBuilder = client.prepareBulk();
+    }
+
+    IndexRequestBuilder indexRequestBuilder = null;
+    if (indexRequestBuilderFactory == null) {
+      indexRequestBuilder = client
+          .prepareIndex(indexNameBuilder.getIndexName(event), indexType)
+          .setSource(serializer.getContentBuilder(event).bytes());
+    } else {
+      indexRequestBuilder = indexRequestBuilderFactory.createIndexRequest(
+          client, indexNameBuilder.getIndexPrefix(event), indexType, event);
+    }
+
+    if (ttlMs > 0) {
+      indexRequestBuilder.setTTL(ttlMs);
+    }
+    bulkRequestBuilder.add(indexRequestBuilder);
+  }
+
+  @Override
+  public void execute() throws Exception {
+    try {
+      logger.info("Sending bulk to elasticsearch cluster");
+      BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();
+      if (bulkResponse.hasFailures()) {
+        throw new EventDeliveryException(bulkResponse.buildFailureMessage());
+      }
+    } finally {
+      bulkRequestBuilder = client.prepareBulk();
+    }
+  }
+
+  /**
+   * Open client to elaticsearch cluster
+   * 
+   * @param clusterName
+   */
+  private void openClient(String clusterName) {
+    logger.info("Using ElasticSearch hostnames: {} ",
+        Arrays.toString(serverAddresses));
+    Settings settings = ImmutableSettings.settingsBuilder()
+        .put("cluster.name", clusterName).build();
+
+    TransportClient transportClient = new TransportClient(settings);
+    for (InetSocketTransportAddress host : serverAddresses) {
+      transportClient.addTransportAddress(host);
+    }
+    if (client != null) {
+      client.close();
+    }
+    client = transportClient;
+  }
+
+  /*
+   * FOR TESTING ONLY...
+   * 
+   * Opens a local discovery node for talking to an elasticsearch server running
+   * in the same JVM
+   */
+  private void openLocalDiscoveryClient() {
+    logger.info("Using ElasticSearch AutoDiscovery mode");
+    Node node = NodeBuilder.nodeBuilder().client(true).local(true).node();
+    if (client != null) {
+      client.close();
+    }
+    client = node.client();
+  }
+
+  @Override
+  public void configure(Context context) {
+    //To change body of implemented methods use File | Settings | File Templates.
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/NoSuchClientTypeException.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/NoSuchClientTypeException.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/NoSuchClientTypeException.java
new file mode 100644
index 0000000..41fbe0d
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/NoSuchClientTypeException.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2014 Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.elasticsearch.client;
+
+/**
+ * Exception class
+ */
+class NoSuchClientTypeException extends Exception {
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/RoundRobinList.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/RoundRobinList.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/RoundRobinList.java
new file mode 100644
index 0000000..dbad8d8
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/RoundRobinList.java
@@ -0,0 +1,44 @@
+package org.apache.flume.sink.elasticsearch.client;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+/*
+ * Copyright 2014 Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class RoundRobinList<T> {
+
+  private Iterator<T> iterator;
+  private final Collection<T> elements;
+
+  public RoundRobinList(Collection<T> elements) {
+    this.elements = elements;
+    iterator = this.elements.iterator();
+  }
+
+  synchronized public T get() {
+    if (iterator.hasNext()) {
+      return iterator.next();
+    } else {
+      iterator = elements.iterator();
+      return iterator.next();
+    }
+  }
+
+  public int size() {
+    return elements.size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchDynamicSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchDynamicSerializer.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchDynamicSerializer.java
index 43a4b12..d4e4654 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchDynamicSerializer.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchDynamicSerializer.java
@@ -18,12 +18,6 @@
  */
 package org.apache.flume.sink.elasticsearch;
 
-import static org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer.charset;
-import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
-import static org.junit.Assert.assertEquals;
-
-import java.util.Map;
-
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.event.EventBuilder;
@@ -31,6 +25,12 @@ import org.elasticsearch.common.collect.Maps;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.junit.Test;
 
+import java.util.Map;
+
+import static org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer.charset;
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.junit.Assert.assertEquals;
+
 public class TestElasticSearchDynamicSerializer {
 
   @Test

http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java
index 1e4e119..807a9c7 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java
@@ -168,27 +168,27 @@ public class TestElasticSearchIndexRequestBuilderFactory
     assertTrue(serializer.configuredWithComponentConfiguration);
   }
 
-}
-
-class FakeEventSerializer implements ElasticSearchEventSerializer {
-
-  static final byte[] FAKE_BYTES = new byte[] {9,8,7,6};
-  boolean configuredWithContext, configuredWithComponentConfiguration;
-
-  @Override
-  public BytesStream getContentBuilder(Event event) throws IOException {
-    FastByteArrayOutputStream fbaos = new FastByteArrayOutputStream(4);
-    fbaos.write(FAKE_BYTES);
-    return fbaos;
+  static class FakeEventSerializer implements ElasticSearchEventSerializer {
+
+    static final byte[] FAKE_BYTES = new byte[]{9, 8, 7, 6};
+    boolean configuredWithContext, configuredWithComponentConfiguration;
+
+    @Override
+    public BytesStream getContentBuilder(Event event) throws IOException {
+      FastByteArrayOutputStream fbaos = new FastByteArrayOutputStream(4);
+      fbaos.write(FAKE_BYTES);
+      return fbaos;
+    }
+
+    @Override
+    public void configure(Context arg0) {
+      configuredWithContext = true;
+    }
+
+    @Override
+    public void configure(ComponentConfiguration arg0) {
+      configuredWithComponentConfiguration = true;
+    }
   }
 
-  @Override
-  public void configure(Context arg0) {
-    configuredWithContext = true;
-  }
-
-  @Override
-  public void configure(ComponentConfiguration arg0) {
-    configuredWithComponentConfiguration = true;
-  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java
index 9dff4b0..d2c9543 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java
@@ -18,13 +18,6 @@
  */
 package org.apache.flume.sink.elasticsearch;
 
-import static org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer.charset;
-import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
-import static org.junit.Assert.assertEquals;
-
-import java.util.Date;
-import java.util.Map;
-
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.event.EventBuilder;
@@ -32,6 +25,13 @@ import org.elasticsearch.common.collect.Maps;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.junit.Test;
 
+import java.util.Date;
+import java.util.Map;
+
+import static org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer.charset;
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.junit.Assert.assertEquals;
+
 public class TestElasticSearchLogStashEventSerializer {
 
   @Test

http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
index 71789e8..15546c1 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
@@ -20,7 +20,6 @@ package org.apache.flume.sink.elasticsearch;
 
 import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.BATCH_SIZE;
 import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLUSTER_NAME;
-import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_PORT;
 import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.HOSTNAMES;
 import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME;
 import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_TYPE;
@@ -29,14 +28,15 @@ import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
 
 import java.io.IOException;
 import java.util.Map;
 import java.util.HashMap;
 import java.util.TimeZone;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.commons.lang.time.FastDateFormat;
+
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -49,7 +49,8 @@ import org.apache.flume.event.EventBuilder;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.client.Requests;
 import org.elasticsearch.common.UUID;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.io.BytesStream;
+import org.elasticsearch.common.io.FastByteArrayOutputStream;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -169,8 +170,7 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest {
     fixture = new ElasticSearchSink();
     fixture.configure(new Context(parameters));
 
-    InetSocketTransportAddress[] expected = { new InetSocketTransportAddress(
-        "10.5.5.27", DEFAULT_PORT) };
+    String[] expected = { "10.5.5.27" };
 
     assertEquals("testing-cluster-name", fixture.getClusterName());
     assertEquals("testing-index-name", fixture.getIndexName());
@@ -189,8 +189,7 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest {
     fixture = new ElasticSearchSink();
     fixture.configure(new Context(parameters));
 
-    InetSocketTransportAddress[] expected = { new InetSocketTransportAddress(
-        "10.5.5.27", DEFAULT_PORT) };
+    String[] expected = { "10.5.5.27" };
 
     assertEquals(DEFAULT_INDEX_NAME, fixture.getIndexName());
     assertEquals(DEFAULT_INDEX_TYPE, fixture.getIndexType());
@@ -205,10 +204,7 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest {
     fixture = new ElasticSearchSink();
     fixture.configure(new Context(parameters));
 
-    InetSocketTransportAddress[] expected = {
-        new InetSocketTransportAddress("10.5.5.27", DEFAULT_PORT),
-        new InetSocketTransportAddress("10.5.5.28", DEFAULT_PORT),
-        new InetSocketTransportAddress("10.5.5.29", DEFAULT_PORT) };
+    String[] expected = { "10.5.5.27", "10.5.5.28", "10.5.5.29" };
 
     assertArrayEquals(expected, fixture.getServerAddresses());
   }
@@ -220,10 +216,7 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest {
     fixture = new ElasticSearchSink();
     fixture.configure(new Context(parameters));
 
-    InetSocketTransportAddress[] expected = {
-      new InetSocketTransportAddress("10.5.5.27", DEFAULT_PORT),
-      new InetSocketTransportAddress("10.5.5.28", DEFAULT_PORT),
-      new InetSocketTransportAddress("10.5.5.29", DEFAULT_PORT) };
+    String[] expected = { "10.5.5.27", "10.5.5.28", "10.5.5.29" };
 
     assertArrayEquals(expected, fixture.getServerAddresses());
   }
@@ -235,25 +228,20 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest {
     fixture = new ElasticSearchSink();
     fixture.configure(new Context(parameters));
 
-    InetSocketTransportAddress[] expected = {
-        new InetSocketTransportAddress("10.5.5.27", 9300),
-        new InetSocketTransportAddress("10.5.5.28", 9301),
-        new InetSocketTransportAddress("10.5.5.29", 9302) };
+    String[] expected = { "10.5.5.27:9300", "10.5.5.28:9301", "10.5.5.29:9302" };
 
     assertArrayEquals(expected, fixture.getServerAddresses());
   }
 
   @Test
   public void shouldParseMultipleHostAndPortsWithWhitespaces() {
-    parameters.put(HOSTNAMES, " 10.5.5.27 : 9300 , 10.5.5.28 : 9301 , 10.5.5.29 : 9302 ");
+    parameters.put(HOSTNAMES,
+        " 10.5.5.27 : 9300 , 10.5.5.28 : 9301 , 10.5.5.29 : 9302 ");
 
     fixture = new ElasticSearchSink();
     fixture.configure(new Context(parameters));
 
-    InetSocketTransportAddress[] expected = {
-      new InetSocketTransportAddress("10.5.5.27", 9300),
-      new InetSocketTransportAddress("10.5.5.28", 9301),
-      new InetSocketTransportAddress("10.5.5.29", 9302) };
+    String[] expected = { "10.5.5.27:9300", "10.5.5.28:9301", "10.5.5.29:9302" };
 
     assertArrayEquals(expected, fixture.getServerAddresses());
   }
@@ -261,11 +249,10 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest {
   @Test
   public void shouldAllowCustomElasticSearchIndexRequestBuilderFactory()
       throws Exception {
-
     parameters.put(SERIALIZER,
         CustomElasticSearchIndexRequestBuilderFactory.class.getName());
 
-    Configurables.configure(fixture, new Context(parameters));
+    fixture.configure(new Context(parameters));
 
     Channel channel = bindAndStartChannel(fixture);
     Transaction tx = channel.getTransaction();
@@ -279,7 +266,7 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest {
     fixture.process();
     fixture.stop();
 
-    assertEquals(fixture.getIndexName()+"-05_17_36_789",
+    assertEquals(fixture.getIndexName() + "-05_17_36_789",
         CustomElasticSearchIndexRequestBuilderFactory.actualIndexName);
     assertEquals(fixture.getIndexType(),
         CustomElasticSearchIndexRequestBuilderFactory.actualIndexType);
@@ -289,7 +276,7 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest {
   }
 
   @Test
-  public void shouldParseFullyQualifiedTTLs(){
+  public void shouldParseFullyQualifiedTTLs() {
     Map<String, Long> testTTLMap = new HashMap<String, Long>();
     testTTLMap.put("1ms", Long.valueOf(1));
     testTTLMap.put("1s", Long.valueOf(1000));
@@ -297,7 +284,7 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest {
     testTTLMap.put("1h", Long.valueOf(3600000));
     testTTLMap.put("1d", Long.valueOf(86400000));
     testTTLMap.put("1w", Long.valueOf(604800000));
-    testTTLMap.put("1",  Long.valueOf(86400000));
+    testTTLMap.put("1", Long.valueOf(86400000));
 
     parameters.put(HOSTNAMES, "10.5.5.27");
     parameters.put(CLUSTER_NAME, "testing-cluster-name");
@@ -309,13 +296,10 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest {
       fixture = new ElasticSearchSink();
       fixture.configure(new Context(parameters));
 
-      InetSocketTransportAddress[] expected = {new InetSocketTransportAddress(
-        "10.5.5.27", DEFAULT_PORT)};
-
+      String[] expected = { "10.5.5.27" };
       assertEquals("testing-cluster-name", fixture.getClusterName());
       assertEquals("testing-index-name", fixture.getIndexName());
       assertEquals("testing-index-type", fixture.getIndexType());
-      System.out.println("TTL MS" + Long.toString(testTTLMap.get(ttl)));
       assertEquals((long) testTTLMap.get(ttl), fixture.getTTLMs());
       assertArrayEquals(expected, fixture.getServerAddresses());
 
@@ -374,10 +358,84 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest {
     }
   }
 
+  @Test
+  public void shouldUseSpecifiedSerializer() throws Exception {
+    Context context = new Context();
+    context.put(SERIALIZER,
+        "org.apache.flume.sink.elasticsearch.FakeEventSerializer");
+
+    assertNull(fixture.getEventSerializer());
+    fixture.configure(context);
+    assertTrue(fixture.getEventSerializer() instanceof FakeEventSerializer);
+  }
+
+  @Test
+  public void shouldUseSpecifiedIndexNameBuilder() throws Exception {
+    Context context = new Context();
+    context.put(ElasticSearchSinkConstants.INDEX_NAME_BUILDER,
+            "org.apache.flume.sink.elasticsearch.FakeIndexNameBuilder");
+
+    assertNull(fixture.getIndexNameBuilder());
+    fixture.configure(context);
+    assertTrue(fixture.getIndexNameBuilder() instanceof FakeIndexNameBuilder);
+  }
+
   public static class FakeConfigurable implements Configurable {
     @Override
     public void configure(Context arg0) {
-        // no-op
+      // no-op
     }
   }
 }
+
+/**
+ * Internal class. Fake event serializer used for tests
+ */
+class FakeEventSerializer implements ElasticSearchEventSerializer {
+
+  static final byte[] FAKE_BYTES = new byte[] { 9, 8, 7, 6 };
+  boolean configuredWithContext, configuredWithComponentConfiguration;
+
+  @Override
+  public BytesStream getContentBuilder(Event event) throws IOException {
+    FastByteArrayOutputStream fbaos = new FastByteArrayOutputStream(4);
+    fbaos.write(FAKE_BYTES);
+    return fbaos;
+  }
+
+  @Override
+  public void configure(Context arg0) {
+    configuredWithContext = true;
+  }
+
+  @Override
+  public void configure(ComponentConfiguration arg0) {
+    configuredWithComponentConfiguration = true;
+  }
+}
+
+/**
+ * Internal class. Fake index name builder used only for tests.
+ */
+class FakeIndexNameBuilder implements IndexNameBuilder {
+
+  static final String INDEX_NAME = "index_name";
+
+  @Override
+  public String getIndexName(Event event) {
+    return INDEX_NAME;
+  }
+
+  @Override
+  public String getIndexPrefix(Event event) {
+    return INDEX_NAME;
+  }
+
+  @Override
+  public void configure(Context context) {
+  }
+
+  @Override
+  public void configure(ComponentConfiguration conf) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilderTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilderTest.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilderTest.java
new file mode 100644
index 0000000..678342a
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilderTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.event.SimpleEvent;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class TimeBasedIndexNameBuilderTest {
+
+  private TimeBasedIndexNameBuilder indexNameBuilder;
+
+  @Before
+  public void setUp() throws Exception {
+    Context context = new Context();
+    context.put(ElasticSearchSinkConstants.INDEX_NAME, "prefix");
+    indexNameBuilder = new TimeBasedIndexNameBuilder();
+    indexNameBuilder.configure(context);
+  }
+
+  @Test
+  public void shouldUseUtcAsBasisForDateFormat() {
+    assertEquals("Coordinated Universal Time",
+            indexNameBuilder.getFastDateFormat().getTimeZone().getDisplayName());
+  }
+
+  @Test
+  public void indexNameShouldBePrefixDashFormattedTimestamp() {
+    long time = 987654321L;
+    Event event = new SimpleEvent();
+    Map<String, String> headers = new HashMap<String, String>();
+    headers.put("timestamp", Long.toString(time));
+    event.setHeaders(headers);
+    assertEquals("prefix-" + indexNameBuilder.getFastDateFormat().format(time),
+        indexNameBuilder.getIndexName(event));
+  }
+}