You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2014/03/21 00:17:34 UTC
[1/2] FLUME-2225. Elasticsearch Sink for ES HTTP API
Repository: flume
Updated Branches:
refs/heads/trunk 1c8b8f5e9 -> e12f0a7a4
http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimestampedEventTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimestampedEventTest.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimestampedEventTest.java
new file mode 100644
index 0000000..bef2ac6
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimestampedEventTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import com.google.common.collect.Maps;
+import org.apache.flume.event.SimpleEvent;
+import org.joda.time.DateTimeUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class TimestampedEventTest {
+ static final long FIXED_TIME_MILLIS = 123456789L;
+
+ @Before
+ public void setFixedJodaTime() {
+ DateTimeUtils.setCurrentMillisFixed(FIXED_TIME_MILLIS);
+ }
+
+ @Test
+ public void shouldEnsureTimestampHeaderPresentInTimestampedEvent() {
+ SimpleEvent base = new SimpleEvent();
+
+ TimestampedEvent timestampedEvent = new TimestampedEvent(base);
+ assertEquals(FIXED_TIME_MILLIS, timestampedEvent.getTimestamp());
+ assertEquals(String.valueOf(FIXED_TIME_MILLIS),
+ timestampedEvent.getHeaders().get("timestamp"));
+ }
+
+ @Test
+ public void shouldUseExistingTimestampHeaderInTimestampedEvent() {
+ SimpleEvent base = new SimpleEvent();
+ Map<String, String> headersWithTimestamp = Maps.newHashMap();
+ headersWithTimestamp.put("timestamp", "-321");
+ base.setHeaders(headersWithTimestamp );
+
+ TimestampedEvent timestampedEvent = new TimestampedEvent(base);
+ assertEquals(-321L, timestampedEvent.getTimestamp());
+ assertEquals("-321", timestampedEvent.getHeaders().get("timestamp"));
+ }
+
+ @Test
+ public void shouldUseExistingAtTimestampHeaderInTimestampedEvent() {
+ SimpleEvent base = new SimpleEvent();
+ Map<String, String> headersWithTimestamp = Maps.newHashMap();
+ headersWithTimestamp.put("@timestamp", "-999");
+ base.setHeaders(headersWithTimestamp );
+
+ TimestampedEvent timestampedEvent = new TimestampedEvent(base);
+ assertEquals(-999L, timestampedEvent.getTimestamp());
+ assertEquals("-999", timestampedEvent.getHeaders().get("@timestamp"));
+ assertNull(timestampedEvent.getHeaders().get("timestamp"));
+ }
+
+ @Test
+ public void shouldPreserveBodyAndNonTimestampHeadersInTimestampedEvent() {
+ SimpleEvent base = new SimpleEvent();
+ base.setBody(new byte[] {1,2,3,4});
+ Map<String, String> headersWithTimestamp = Maps.newHashMap();
+ headersWithTimestamp.put("foo", "bar");
+ base.setHeaders(headersWithTimestamp );
+
+ TimestampedEvent timestampedEvent = new TimestampedEvent(base);
+ assertEquals("bar", timestampedEvent.getHeaders().get("foo"));
+ assertArrayEquals(base.getBody(), timestampedEvent.getBody());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/RoundRobinListTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/RoundRobinListTest.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/RoundRobinListTest.java
new file mode 100644
index 0000000..38e7399
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/RoundRobinListTest.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2014 Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.elasticsearch.client;
+
+import java.util.Arrays;
+import static org.junit.Assert.assertEquals;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RoundRobinListTest {
+
+ private RoundRobinList<String> fixture;
+
+ @Before
+ public void setUp() {
+ fixture = new RoundRobinList<String>(Arrays.asList("test1", "test2"));
+ }
+
+ @Test
+ public void shouldReturnNextElement() {
+ assertEquals("test1", fixture.get());
+ assertEquals("test2", fixture.get());
+ assertEquals("test1", fixture.get());
+ assertEquals("test2", fixture.get());
+ assertEquals("test1", fixture.get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchClientFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchClientFactory.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchClientFactory.java
new file mode 100644
index 0000000..4b70b65
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchClientFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch.client;
+
+import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertThat;
+import org.mockito.Mock;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+public class TestElasticSearchClientFactory {
+
+ ElasticSearchClientFactory factory;
+
+ @Mock
+ ElasticSearchEventSerializer serializer;
+
+ @Before
+ public void setUp() {
+ initMocks(this);
+ factory = new ElasticSearchClientFactory();
+ }
+
+ @Test
+ public void shouldReturnTransportClient() throws Exception {
+ String[] hostNames = { "127.0.0.1" };
+ Object o = factory.getClient(ElasticSearchClientFactory.TransportClient,
+ hostNames, "test", serializer, null);
+ assertThat(o, instanceOf(ElasticSearchTransportClient.class));
+ }
+
+ @Test
+ public void shouldReturnRestClient() throws NoSuchClientTypeException {
+ String[] hostNames = { "127.0.0.1" };
+ Object o = factory.getClient(ElasticSearchClientFactory.RestClient,
+ hostNames, "test", serializer, null);
+ assertThat(o, instanceOf(ElasticSearchRestClient.class));
+ }
+
+ @Test(expected=NoSuchClientTypeException.class)
+ public void shouldThrowNoSuchClientTypeException() throws NoSuchClientTypeException {
+ String[] hostNames = {"127.0.0.1"};
+ factory.getClient("not_existing_client", hostNames, "test", null, null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchRestClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchRestClient.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchRestClient.java
new file mode 100644
index 0000000..b7d8822
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchRestClient.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch.client;
+
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer;
+import org.apache.flume.sink.elasticsearch.IndexNameBuilder;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.io.BytesStream;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+
+import java.io.IOException;
+import java.util.List;
+
+import static junit.framework.Assert.assertEquals;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.StatusLine;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.util.EntityUtils;
+import org.elasticsearch.common.bytes.BytesArray;
+import static org.mockito.Mockito.*;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+public class TestElasticSearchRestClient {
+
+ private ElasticSearchRestClient fixture;
+
+ @Mock
+ private ElasticSearchEventSerializer serializer;
+
+ @Mock
+ private IndexNameBuilder nameBuilder;
+
+ @Mock
+ private Event event;
+
+ @Mock
+ private HttpClient httpClient;
+
+ @Mock
+ private HttpResponse httpResponse;
+
+ @Mock
+ private StatusLine httpStatus;
+
+ @Mock
+ private HttpEntity httpEntity;
+
+ private static final String INDEX_NAME = "foo_index";
+ private static final String MESSAGE_CONTENT = "{\"body\":\"test\"}";
+ private static final String[] HOSTS = {"host1", "host2"};
+
+ @Before
+ public void setUp() throws IOException {
+ initMocks(this);
+ BytesReference bytesReference = mock(BytesReference.class);
+ BytesStream bytesStream = mock(BytesStream.class);
+
+ when(nameBuilder.getIndexName(any(Event.class))).thenReturn(INDEX_NAME);
+ when(bytesReference.toBytesArray()).thenReturn(new BytesArray(MESSAGE_CONTENT));
+ when(bytesStream.bytes()).thenReturn(bytesReference);
+ when(serializer.getContentBuilder(any(Event.class))).thenReturn(bytesStream);
+ fixture = new ElasticSearchRestClient(HOSTS, serializer, httpClient);
+ }
+
+ @Test
+ public void shouldAddNewEventWithoutTTL() throws Exception {
+ ArgumentCaptor<HttpPost> argument = ArgumentCaptor.forClass(HttpPost.class);
+
+ when(httpStatus.getStatusCode()).thenReturn(HttpStatus.SC_OK);
+ when(httpResponse.getStatusLine()).thenReturn(httpStatus);
+ when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(httpResponse);
+
+ fixture.addEvent(event, nameBuilder, "bar_type", -1);
+ fixture.execute();
+
+ verify(httpClient).execute(isA(HttpUriRequest.class));
+ verify(httpClient).execute(argument.capture());
+
+ assertEquals("http://host1/_bulk", argument.getValue().getURI().toString());
+ assertEquals("{\"index\":{\"_type\":\"bar_type\",\"_index\":\"foo_index\"}}\n" + MESSAGE_CONTENT + "\n",
+ EntityUtils.toString(argument.getValue().getEntity()));
+ }
+
+ @Test
+ public void shouldAddNewEventWithTTL() throws Exception {
+ ArgumentCaptor<HttpPost> argument = ArgumentCaptor.forClass(HttpPost.class);
+
+ when(httpStatus.getStatusCode()).thenReturn(HttpStatus.SC_OK);
+ when(httpResponse.getStatusLine()).thenReturn(httpStatus);
+ when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(httpResponse);
+
+ fixture.addEvent(event, nameBuilder, "bar_type", 123);
+ fixture.execute();
+
+ verify(httpClient).execute(isA(HttpUriRequest.class));
+ verify(httpClient).execute(argument.capture());
+
+ assertEquals("http://host1/_bulk", argument.getValue().getURI().toString());
+ assertEquals("{\"index\":{\"_type\":\"bar_type\",\"_index\":\"foo_index\",\"_ttl\":\"123\"}}\n" +
+ MESSAGE_CONTENT + "\n", EntityUtils.toString(argument.getValue().getEntity()));
+ }
+
+ @Test(expected = EventDeliveryException.class)
+ public void shouldThrowEventDeliveryException() throws Exception {
+ ArgumentCaptor<HttpPost> argument = ArgumentCaptor.forClass(HttpPost.class);
+
+ when(httpStatus.getStatusCode()).thenReturn(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+ when(httpResponse.getStatusLine()).thenReturn(httpStatus);
+ when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(httpResponse);
+
+ fixture.addEvent(event, nameBuilder, "bar_type", 123);
+ fixture.execute();
+ }
+
+ @Test()
+ public void shouldRetryBulkOperation() throws Exception {
+ ArgumentCaptor<HttpPost> argument = ArgumentCaptor.forClass(HttpPost.class);
+
+ when(httpStatus.getStatusCode()).thenReturn(HttpStatus.SC_INTERNAL_SERVER_ERROR, HttpStatus.SC_OK);
+ when(httpResponse.getStatusLine()).thenReturn(httpStatus);
+ when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(httpResponse);
+
+ fixture.addEvent(event, nameBuilder, "bar_type", 123);
+ fixture.execute();
+
+ verify(httpClient, times(2)).execute(isA(HttpUriRequest.class));
+ verify(httpClient, times(2)).execute(argument.capture());
+
+ List<HttpPost> allValues = argument.getAllValues();
+ assertEquals("http://host1/_bulk", allValues.get(0).getURI().toString());
+ assertEquals("http://host2/_bulk", allValues.get(1).getURI().toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchTransportClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchTransportClient.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchTransportClient.java
new file mode 100644
index 0000000..b7b8e74
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchTransportClient.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch.client;
+
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer;
+import org.apache.flume.sink.elasticsearch.IndexNameBuilder;
+import org.elasticsearch.action.ListenableActionFuture;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.io.BytesStream;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+
+import java.io.IOException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.*;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+public class TestElasticSearchTransportClient {
+
+ private ElasticSearchTransportClient fixture;
+
+ @Mock
+ private ElasticSearchEventSerializer serializer;
+
+ @Mock
+ private IndexNameBuilder nameBuilder;
+
+ @Mock
+ private Client elasticSearchClient;
+
+ @Mock
+ private BulkRequestBuilder bulkRequestBuilder;
+
+ @Mock
+ private IndexRequestBuilder indexRequestBuilder;
+
+ @Mock
+ private Event event;
+
+ @Before
+ public void setUp() throws IOException {
+ initMocks(this);
+ BytesReference bytesReference = mock(BytesReference.class);
+ BytesStream bytesStream = mock(BytesStream.class);
+
+ when(nameBuilder.getIndexName(any(Event.class))).thenReturn("foo_index");
+ when(bytesReference.toBytes()).thenReturn("{\"body\":\"test\"}".getBytes());
+ when(bytesStream.bytes()).thenReturn(bytesReference);
+ when(serializer.getContentBuilder(any(Event.class)))
+ .thenReturn(bytesStream);
+ when(elasticSearchClient.prepareIndex(anyString(), anyString()))
+ .thenReturn(indexRequestBuilder);
+ when(indexRequestBuilder.setSource(bytesReference)).thenReturn(
+ indexRequestBuilder);
+
+ fixture = new ElasticSearchTransportClient(elasticSearchClient, serializer);
+ fixture.setBulkRequestBuilder(bulkRequestBuilder);
+ }
+
+ @Test
+ public void shouldAddNewEventWithoutTTL() throws Exception {
+ fixture.addEvent(event, nameBuilder, "bar_type", -1);
+ verify(indexRequestBuilder).setSource(
+ serializer.getContentBuilder(event).bytes());
+ verify(bulkRequestBuilder).add(indexRequestBuilder);
+ }
+
+ @Test
+ public void shouldAddNewEventWithTTL() throws Exception {
+ fixture.addEvent(event, nameBuilder, "bar_type", 10);
+ verify(indexRequestBuilder).setTTL(10);
+ verify(indexRequestBuilder).setSource(
+ serializer.getContentBuilder(event).bytes());
+ }
+
+ @Test
+ public void shouldExecuteBulkRequestBuilder() throws Exception {
+ ListenableActionFuture<BulkResponse> action =
+ (ListenableActionFuture<BulkResponse>) mock(ListenableActionFuture.class);
+ BulkResponse response = mock(BulkResponse.class);
+ when(bulkRequestBuilder.execute()).thenReturn(action);
+ when(action.actionGet()).thenReturn(response);
+ when(response.hasFailures()).thenReturn(false);
+
+ fixture.addEvent(event, nameBuilder, "bar_type", 10);
+ fixture.execute();
+ verify(bulkRequestBuilder).execute();
+ }
+
+ @Test(expected = EventDeliveryException.class)
+ public void shouldThrowExceptionOnExecuteFailed() throws Exception {
+ ListenableActionFuture<BulkResponse> action =
+ (ListenableActionFuture<BulkResponse>) mock(ListenableActionFuture.class);
+ BulkResponse response = mock(BulkResponse.class);
+ when(bulkRequestBuilder.execute()).thenReturn(action);
+ when(action.actionGet()).thenReturn(response);
+ when(response.hasFailures()).thenReturn(true);
+
+ fixture.addEvent(event, nameBuilder, "bar_type", 10);
+ fixture.execute();
+ }
+}
[2/2] git commit: FLUME-2225. Elasticsearch Sink for ES HTTP API
Posted by hs...@apache.org.
FLUME-2225. Elasticsearch Sink for ES HTTP API
(Pawel Rog via Edward Sargisson, Hari Shreedharan)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/e12f0a7a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/e12f0a7a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/e12f0a7a
Branch: refs/heads/trunk
Commit: e12f0a7a46752eff2ff3aaddfa0b93fb606074d7
Parents: 1c8b8f5
Author: Hari Shreedharan <hs...@apache.org>
Authored: Thu Mar 20 16:16:14 2014 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Thu Mar 20 16:17:26 2014 -0700
----------------------------------------------------------------------
.../flume-ng-elasticsearch-sink/pom.xml | 24 +-
...ElasticSearchIndexRequestBuilderFactory.java | 40 +--
...ElasticSearchIndexRequestBuilderFactory.java | 26 +-
.../sink/elasticsearch/ElasticSearchSink.java | 278 +++++++++----------
.../ElasticSearchSinkConstants.java | 29 ++
.../sink/elasticsearch/IndexNameBuilder.java | 42 +++
.../elasticsearch/SimpleIndexNameBuilder.java | 45 +++
.../TimeBasedIndexNameBuilder.java | 90 ++++++
.../sink/elasticsearch/TimestampedEvent.java | 60 ++++
.../client/ElasticSearchClient.java | 58 ++++
.../client/ElasticSearchClientFactory.java | 76 +++++
.../client/ElasticSearchRestClient.java | 148 ++++++++++
.../client/ElasticSearchTransportClient.java | 231 +++++++++++++++
.../client/NoSuchClientTypeException.java | 23 ++
.../elasticsearch/client/RoundRobinList.java | 44 +++
.../TestElasticSearchDynamicSerializer.java | 12 +-
...ElasticSearchIndexRequestBuilderFactory.java | 42 +--
...estElasticSearchLogStashEventSerializer.java | 14 +-
.../elasticsearch/TestElasticSearchSink.java | 126 ++++++---
.../TimeBasedIndexNameBuilderTest.java | 60 ++++
.../elasticsearch/TimestampedEventTest.java | 88 ++++++
.../client/RoundRobinListTest.java | 41 +++
.../client/TestElasticSearchClientFactory.java | 64 +++++
.../client/TestElasticSearchRestClient.java | 158 +++++++++++
.../TestElasticSearchTransportClient.java | 127 +++++++++
25 files changed, 1679 insertions(+), 267 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/pom.xml b/flume-ng-sinks/flume-ng-elasticsearch-sink/pom.xml
index bdc21d1..dedb738 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/pom.xml
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/pom.xml
@@ -24,7 +24,7 @@
<artifactId>flume-ng-elasticsearch-sink</artifactId>
<name>Flume NG ElasticSearch Sink</name>
- <build>
+ <build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
@@ -51,9 +51,14 @@
</dependency>
<dependency>
- <groupId>org.elasticsearch</groupId>
- <artifactId>elasticsearch</artifactId>
- <optional>true</optional>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
</dependency>
<dependency>
@@ -78,5 +83,16 @@
<artifactId>guava</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java
index 6effe34..de84b95 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java
@@ -19,22 +19,17 @@
package org.apache.flume.sink.elasticsearch;
import java.io.IOException;
-import java.util.Map;
-import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.FastDateFormat;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.conf.ComponentConfiguration;
import org.apache.flume.conf.Configurable;
import org.apache.flume.conf.ConfigurableComponent;
-import org.apache.flume.event.SimpleEvent;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
-import org.joda.time.DateTimeUtils;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
/**
* Abstract base class for custom implementations of
@@ -122,37 +117,4 @@ public abstract class AbstractElasticSearchIndexRequestBuilderFactory
IndexRequestBuilder indexRequest, String indexName,
String indexType, Event event) throws IOException;
-}
-
-/**
- * {@link Event} implementation that has a timestamp.
- * The timestamp is taken from (in order of precedence):<ol>
- * <li>The "timestamp" header of the base event, if present</li>
- * <li>The "@timestamp" header of the base event, if present</li>
- * <li>The current time in millis, otherwise</li>
- * </ol>
- */
-final class TimestampedEvent extends SimpleEvent {
-
- private final long timestamp;
-
- TimestampedEvent(Event base) {
- setBody(base.getBody());
- Map<String, String> headers = Maps.newHashMap(base.getHeaders());
- String timestampString = headers.get("timestamp");
- if (StringUtils.isBlank(timestampString)) {
- timestampString = headers.get("@timestamp");
- }
- if (StringUtils.isBlank(timestampString)) {
- this.timestamp = DateTimeUtils.currentTimeMillis();
- headers.put("timestamp", String.valueOf(timestamp ));
- } else {
- this.timestamp = Long.valueOf(timestampString);
- }
- setHeaders(headers);
- }
-
- long getTimestamp() {
- return timestamp;
- }
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java
index 8e77a1e..1ca227a 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java
@@ -18,9 +18,6 @@
*/
package org.apache.flume.sink.elasticsearch;
-import java.io.IOException;
-import java.util.TimeZone;
-
import org.apache.commons.lang.time.FastDateFormat;
import org.apache.flume.Event;
import org.apache.flume.conf.Configurable;
@@ -28,10 +25,13 @@ import org.apache.flume.conf.ConfigurableComponent;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
+import java.io.IOException;
+import java.util.TimeZone;
+
/**
- * Interface for creating ElasticSearch {@link IndexRequestBuilder}
- * instances from serialized flume events. This is configurable, so any config
- * params required should be taken through this.
+ * Interface for creating ElasticSearch {@link IndexRequestBuilder} instances
+ * from serialized flume events. This is configurable, so any config params
+ * required should be taken through this.
*/
public interface ElasticSearchIndexRequestBuilderFactory extends Configurable,
ConfigurableComponent {
@@ -43,16 +43,18 @@ public interface ElasticSearchIndexRequestBuilderFactory extends Configurable,
* @return prepared ElasticSearch {@link IndexRequestBuilder} instance
* @param client
* ElasticSearch {@link Client} to prepare index from
- * @param indexPrefix
+ * @param indexPrefix
* Prefix of index name to use -- as configured on the sink
- * @param indexType
+ * @param indexType
* Index type to use -- as configured on the sink
- * @param event
+ * @param event
* Flume event to serialize and add to index request
* @throws IOException
* If an error occurs e.g. during serialization
- */
- IndexRequestBuilder createIndexRequest(Client client,
- String indexPrefix, String indexType, Event event) throws IOException;
+ */
+ IndexRequestBuilder createIndexRequest(Client client, String indexPrefix,
+ String indexType, Event event) throws IOException;
+
+
}
http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
index e38ab19..39b6db5 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
@@ -23,7 +23,6 @@ import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLU
import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_CLUSTER_NAME;
import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_INDEX_NAME;
import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_INDEX_TYPE;
-import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_PORT;
import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_TTL;
import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.HOSTNAMES;
import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME;
@@ -32,12 +31,6 @@ import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.SER
import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.SERIALIZER_PREFIX;
import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL;
import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL_REGEX;
-
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
@@ -48,16 +41,8 @@ import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.node.NodeBuilder;
+import org.apache.flume.sink.elasticsearch.client.ElasticSearchClient;
+import org.apache.flume.sink.elasticsearch.client.ElasticSearchClientFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,22 +50,35 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLIENT_PREFIX;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLIENT_TYPE;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_CLIENT_TYPE;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_INDEX_NAME_BUILDER_CLASS;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_SERIALIZER_CLASS;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME_BUILDER;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME_BUILDER_PREFIX;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+
/**
* A sink which reads events from a channel and writes them to ElasticSearch
* based on the work done by https://github.com/Aconex/elasticflume.git.</p>
- *
+ *
* This sink supports batch reading of events from the channel and writing them
* to ElasticSearch.</p>
- *
+ *
* Indexes will be rolled daily using the format 'indexname-YYYY-MM-dd' to allow
* easier management of the index</p>
- *
+ *
* This sink must be configured with with mandatory parameters detailed in
- * {@link ElasticSearchSinkConstants}</p>
- * It is recommended as a secondary step the ElasticSearch indexes are optimized
- * for the specified serializer. This is not handled by the sink but is
- * typically done by deploying a config template alongside the ElasticSearch
- * deploy</p>
+ * {@link ElasticSearchSinkConstants}</p> It is recommended as a secondary step
+ * the ElasticSearch indexes are optimized for the specified serializer. This is
+ * not handled by the sink but is typically done by deploying a config template
+ * alongside the ElasticSearch deploy</p>
+ *
* @see http
* ://www.elasticsearch.org/guide/reference/api/admin-indices-templates.
* html
@@ -101,15 +99,19 @@ public class ElasticSearchSink extends AbstractSink implements Configurable {
private String clusterName = DEFAULT_CLUSTER_NAME;
private String indexName = DEFAULT_INDEX_NAME;
private String indexType = DEFAULT_INDEX_TYPE;
- private final Pattern pattern
- = Pattern.compile(TTL_REGEX, Pattern.CASE_INSENSITIVE);
+ private String clientType = DEFAULT_CLIENT_TYPE;
+ private final Pattern pattern = Pattern.compile(TTL_REGEX,
+ Pattern.CASE_INSENSITIVE);
private Matcher matcher = pattern.matcher("");
- private InetSocketTransportAddress[] serverAddresses;
+ private String[] serverAddresses = null;
+
+ private ElasticSearchClient client = null;
+ private Context elasticSearchClientContext = null;
- private Node node;
- private Client client;
private ElasticSearchIndexRequestBuilderFactory indexRequestFactory;
+ private ElasticSearchEventSerializer eventSerializer;
+ private IndexNameBuilder indexNameBuilder;
private SinkCounter sinkCounter;
/**
@@ -122,12 +124,12 @@ public class ElasticSearchSink extends AbstractSink implements Configurable {
/**
* Create an {@link ElasticSearchSink}</p>
- *
+ *
* @param isLocal
* If <tt>true</tt> sink will be configured to only talk to an
* ElasticSearch instance hosted in the same JVM, should always be
* false is production
- *
+ *
*/
@VisibleForTesting
ElasticSearchSink(boolean isLocal) {
@@ -135,7 +137,7 @@ public class ElasticSearchSink extends AbstractSink implements Configurable {
}
@VisibleForTesting
- InetSocketTransportAddress[] getServerAddresses() {
+ String[] getServerAddresses() {
return serverAddresses;
}
@@ -159,6 +161,16 @@ public class ElasticSearchSink extends AbstractSink implements Configurable {
return ttlMs;
}
+ @VisibleForTesting
+ ElasticSearchEventSerializer getEventSerializer() {
+ return eventSerializer;
+ }
+
+ @VisibleForTesting
+ IndexNameBuilder getIndexNameBuilder() {
+ return indexNameBuilder;
+ }
+
@Override
public Status process() throws EventDeliveryException {
logger.debug("processing...");
@@ -167,47 +179,33 @@ public class ElasticSearchSink extends AbstractSink implements Configurable {
Transaction txn = channel.getTransaction();
try {
txn.begin();
- BulkRequestBuilder bulkRequest = client.prepareBulk();
- for (int i = 0; i < batchSize; i++) {
+ int count;
+ for (count = 0; count < batchSize; ++count) {
Event event = channel.take();
if (event == null) {
break;
}
-
- IndexRequestBuilder indexRequest =
- indexRequestFactory.createIndexRequest(
- client, indexName, indexType, event);
-
- if (ttlMs > 0) {
- indexRequest.setTTL(ttlMs);
- }
-
- bulkRequest.add(indexRequest);
+ client.addEvent(event, indexNameBuilder, indexType, ttlMs);
}
- int size = bulkRequest.numberOfActions();
- if (size <= 0) {
+ if (count <= 0) {
sinkCounter.incrementBatchEmptyCount();
counterGroup.incrementAndGet("channel.underflow");
status = Status.BACKOFF;
} else {
- if (size < batchSize) {
+ if (count < batchSize) {
sinkCounter.incrementBatchUnderflowCount();
status = Status.BACKOFF;
} else {
sinkCounter.incrementBatchCompleteCount();
}
- sinkCounter.addToEventDrainAttemptCount(size);
-
- BulkResponse bulkResponse = bulkRequest.execute().actionGet();
- if (bulkResponse.hasFailures()) {
- throw new EventDeliveryException(bulkResponse.buildFailureMessage());
- }
+ sinkCounter.addToEventDrainAttemptCount(count);
+ client.execute();
}
txn.commit();
- sinkCounter.addToEventDrainSuccessCount(size);
+ sinkCounter.addToEventDrainSuccessCount(count);
counterGroup.incrementAndGet("transaction.success");
} catch (Throwable ex) {
try {
@@ -238,22 +236,10 @@ public class ElasticSearchSink extends AbstractSink implements Configurable {
@Override
public void configure(Context context) {
if (!isLocal) {
- String[] hostNames = null;
if (StringUtils.isNotBlank(context.getString(HOSTNAMES))) {
- hostNames = context.getString(HOSTNAMES).split(",");
- }
- Preconditions.checkState(hostNames != null && hostNames.length > 0,
- "Missing Param:" + HOSTNAMES);
-
- serverAddresses = new InetSocketTransportAddress[hostNames.length];
- for (int i = 0; i < hostNames.length; i++) {
- String[] hostPort = hostNames[i].trim().split(":");
- String host = hostPort[0].trim();
- int port = hostPort.length == 2 ? Integer.parseInt(hostPort[1].trim())
- : DEFAULT_PORT;
- serverAddresses[i] = new InetSocketTransportAddress(host, port);
+ serverAddresses = StringUtils.deleteWhitespace(
+ context.getString(HOSTNAMES)).split(",");
}
-
Preconditions.checkState(serverAddresses != null
&& serverAddresses.length > 0, "Missing Param:" + HOSTNAMES);
}
@@ -280,7 +266,14 @@ public class ElasticSearchSink extends AbstractSink implements Configurable {
+ " must be greater than 0 or not set.");
}
- String serializerClazz = "org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer";
+ if (StringUtils.isNotBlank(context.getString(CLIENT_TYPE))) {
+ clientType = context.getString(CLIENT_TYPE);
+ }
+
+ elasticSearchClientContext = new Context();
+ elasticSearchClientContext.putAll(context.getSubProperties(CLIENT_PREFIX));
+
+ String serializerClazz = DEFAULT_SERIALIZER_CLASS;
if (StringUtils.isNotBlank(context.getString(SERIALIZER))) {
serializerClazz = context.getString(SERIALIZER);
}
@@ -293,17 +286,18 @@ public class ElasticSearchSink extends AbstractSink implements Configurable {
Class<? extends Configurable> clazz = (Class<? extends Configurable>) Class
.forName(serializerClazz);
Configurable serializer = clazz.newInstance();
+
if (serializer instanceof ElasticSearchIndexRequestBuilderFactory) {
- indexRequestFactory = (ElasticSearchIndexRequestBuilderFactory) serializer;
- } else if (serializer instanceof ElasticSearchEventSerializer){
- indexRequestFactory = new EventSerializerIndexRequestBuilderFactory(
- (ElasticSearchEventSerializer) serializer);
+ indexRequestFactory
+ = (ElasticSearchIndexRequestBuilderFactory) serializer;
+ indexRequestFactory.configure(serializerContext);
+ } else if (serializer instanceof ElasticSearchEventSerializer) {
+ eventSerializer = (ElasticSearchEventSerializer) serializer;
+ eventSerializer.configure(serializerContext);
} else {
- throw new IllegalArgumentException(
- serializerClazz + " is neither an ElasticSearchEventSerializer"
- + " nor an ElasticSearchIndexRequestBuilderFactory.");
+ throw new IllegalArgumentException(serializerClazz
+ + " is not an ElasticSearchEventSerializer");
}
- indexRequestFactory.configure(serializerContext);
} catch (Exception e) {
logger.error("Could not instantiate event serializer.", e);
Throwables.propagate(e);
@@ -313,6 +307,32 @@ public class ElasticSearchSink extends AbstractSink implements Configurable {
sinkCounter = new SinkCounter(getName());
}
+ String indexNameBuilderClass = DEFAULT_INDEX_NAME_BUILDER_CLASS;
+ if (StringUtils.isNotBlank(context.getString(INDEX_NAME_BUILDER))) {
+ indexNameBuilderClass = context.getString(INDEX_NAME_BUILDER);
+ }
+
+ Context indexnameBuilderContext = new Context();
+ serializerContext.putAll(
+ context.getSubProperties(INDEX_NAME_BUILDER_PREFIX));
+
+ try {
+ @SuppressWarnings("unchecked")
+ Class<? extends IndexNameBuilder> clazz
+ = (Class<? extends IndexNameBuilder>) Class
+ .forName(indexNameBuilderClass);
+ indexNameBuilder = clazz.newInstance();
+ indexnameBuilderContext.put(INDEX_NAME, indexName);
+ indexNameBuilder.configure(indexnameBuilderContext);
+ } catch (Exception e) {
+ logger.error("Could not instantiate index name builder.", e);
+ Throwables.propagate(e);
+ }
+
+ if (sinkCounter == null) {
+ sinkCounter = new SinkCounter(getName());
+ }
+
Preconditions.checkState(StringUtils.isNotBlank(indexName),
"Missing Param:" + INDEX_NAME);
Preconditions.checkState(StringUtils.isNotBlank(indexType),
@@ -325,13 +345,27 @@ public class ElasticSearchSink extends AbstractSink implements Configurable {
@Override
public void start() {
+ ElasticSearchClientFactory clientFactory = new ElasticSearchClientFactory();
+
logger.info("ElasticSearch sink {} started");
sinkCounter.start();
try {
- openConnection();
+ if (isLocal) {
+ client = clientFactory.getLocalClient(
+ clientType, eventSerializer, indexRequestFactory);
+ } else {
+ client = clientFactory.getClient(clientType, serverAddresses,
+ clusterName, eventSerializer, indexRequestFactory);
+ client.configure(elasticSearchClientContext);
+ }
+ sinkCounter.incrementConnectionCreatedCount();
} catch (Exception ex) {
+ ex.printStackTrace();
sinkCounter.incrementConnectionFailedCount();
- closeConnection();
+ if (client != null) {
+ client.close();
+ sinkCounter.incrementConnectionClosedCount();
+ }
}
super.start();
@@ -340,39 +374,31 @@ public class ElasticSearchSink extends AbstractSink implements Configurable {
@Override
public void stop() {
logger.info("ElasticSearch sink {} stopping");
- closeConnection();
-
+ if (client != null) {
+ client.close();
+ }
+ sinkCounter.incrementConnectionClosedCount();
sinkCounter.stop();
super.stop();
}
- private void openConnection() {
- if (isLocal) {
- logger.info("Using ElasticSearch AutoDiscovery mode");
- openLocalDiscoveryClient();
- } else {
- logger.info("Using ElasticSearch hostnames: {} ",
- Arrays.toString(serverAddresses));
- openClient();
- }
- sinkCounter.incrementConnectionCreatedCount();
- }
-
/*
- * Returns TTL value of ElasticSearch index in milliseconds
- * when TTL specifier is "ms" / "s" / "m" / "h" / "d" / "w".
- * In case of unknown specifier TTL is not set. When specifier
- * is not provided it defaults to days in milliseconds where the number
- * of days is parsed integer from TTL string provided by user.
- * <p>
- * Elasticsearch supports ttl values being provided in the format: 1d / 1w / 1ms / 1s / 1h / 1m
- * specify a time unit like d (days), m (minutes), h (hours), ms (milliseconds) or w (weeks),
- * milliseconds is used as default unit.
- * http://www.elasticsearch.org/guide/reference/mapping/ttl-field/.
- * @param ttl TTL value provided by user in flume configuration file for the sink
- * @return the ttl value in milliseconds
- */
- private long parseTTL(String ttl){
+ * Returns TTL value of ElasticSearch index in milliseconds when TTL specifier
+ * is "ms" / "s" / "m" / "h" / "d" / "w". In case of unknown specifier TTL is
+ * not set. When specifier is not provided it defaults to days in milliseconds
+ * where the number of days is parsed integer from TTL string provided by
+ * user. <p> Elasticsearch supports ttl values being provided in the format:
+ * 1d / 1w / 1ms / 1s / 1h / 1m specify a time unit like d (days), m
+ * (minutes), h (hours), ms (milliseconds) or w (weeks), milliseconds is used
+ * as default unit.
+ * http://www.elasticsearch.org/guide/reference/mapping/ttl-field/.
+ *
+ * @param ttl TTL value provided by user in flume configuration file for the
+ * sink
+ *
+ * @return the ttl value in milliseconds
+ */
+ private long parseTTL(String ttl) {
matcher = matcher.reset(ttl);
while (matcher.find()) {
if (matcher.group(2).equals("ms")) {
@@ -398,40 +424,4 @@ public class ElasticSearchSink extends AbstractSink implements Configurable {
logger.info("TTL not provided. Skipping the TTL config by returning 0.");
return 0;
}
-
- /*
- * FOR TESTING ONLY...
- *
- * Opens a local discovery node for talking to an elasticsearch server running
- * in the same JVM
- */
- private void openLocalDiscoveryClient() {
- node = NodeBuilder.nodeBuilder().client(true).local(true).node();
- client = node.client();
- }
-
- private void openClient() {
- Settings settings = ImmutableSettings.settingsBuilder()
- .put("cluster.name", clusterName).build();
-
- TransportClient transport = new TransportClient(settings);
- for (InetSocketTransportAddress host : serverAddresses) {
- transport.addTransportAddress(host);
- }
- client = transport;
- }
-
- private void closeConnection() {
- if (client != null) {
- client.close();
- }
- client = null;
-
- if (node != null) {
- node.close();
- }
- node = null;
-
- sinkCounter.incrementConnectionClosedCount();
- }
}
http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java
index dd0c59d..da88def 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java
@@ -70,6 +70,30 @@ public class ElasticSearchSinkConstants {
public static final String SERIALIZER_PREFIX = SERIALIZER + ".";
/**
+ * The fully qualified class name of the index name builder the sink
+ * should use to determine name of index where the event should be sent.
+ */
+ public static final String INDEX_NAME_BUILDER = "indexNameBuilder";
+
+ /**
+ * The fully qualified class name of the index name builder the sink
+ * should use to determine name of index where the event should be sent.
+ */
+ public static final String INDEX_NAME_BUILDER_PREFIX
+ = INDEX_NAME_BUILDER + ".";
+
+ /**
+ * The client type used for sending bulks to ElasticSearch
+ */
+ public static final String CLIENT_TYPE = "client";
+
+ /**
+ * The client prefix to extract the configuration that will be passed to
+ * elasticsearch client.
+ */
+ public static final String CLIENT_PREFIX = CLIENT_TYPE + ".";
+
+ /**
* DEFAULTS USED BY THE SINK
*/
@@ -78,5 +102,10 @@ public class ElasticSearchSinkConstants {
public static final String DEFAULT_INDEX_NAME = "flume";
public static final String DEFAULT_INDEX_TYPE = "log";
public static final String DEFAULT_CLUSTER_NAME = "elasticsearch";
+ public static final String DEFAULT_CLIENT_TYPE = "transport";
public static final String TTL_REGEX = "^(\\d+)(\\D*)";
+ public static final String DEFAULT_SERIALIZER_CLASS = "org.apache.flume." +
+ "sink.elasticsearch.ElasticSearchLogStashEventSerializer";
+ public static final String DEFAULT_INDEX_NAME_BUILDER_CLASS =
+ "org.apache.flume.sink.elasticsearch.TimeBasedIndexNameBuilder";
}
http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/IndexNameBuilder.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/IndexNameBuilder.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/IndexNameBuilder.java
new file mode 100644
index 0000000..1dd4415
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/IndexNameBuilder.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import org.apache.flume.Event;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.ConfigurableComponent;
+
+public interface IndexNameBuilder extends Configurable,
+ ConfigurableComponent {
+ /**
+ * Gets the name of the index to use for an index request
+ * @param event
+ * Event which determines index name
+ * @return index name of the form 'indexPrefix-indexDynamicName'
+ */
+ public String getIndexName(Event event);
+
+ /**
+ * Gets the prefix of index to use for an index request.
+ * @param event
+ * Event which determines index name
+ * @return Index prefix name
+ */
+ public String getIndexPrefix(Event event);
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/SimpleIndexNameBuilder.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/SimpleIndexNameBuilder.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/SimpleIndexNameBuilder.java
new file mode 100644
index 0000000..19079af
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/SimpleIndexNameBuilder.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2014 Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.elasticsearch;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.conf.ComponentConfiguration;
+
+public class SimpleIndexNameBuilder implements IndexNameBuilder {
+
+ private String indexName;
+
+ @Override
+ public String getIndexName(Event event) {
+ return indexName;
+ }
+
+ @Override
+ public String getIndexPrefix(Event event) {
+ return indexName;
+ }
+
+ @Override
+ public void configure(Context context) {
+ indexName = context.getString(ElasticSearchSinkConstants.INDEX_NAME);
+ }
+
+ @Override
+ public void configure(ComponentConfiguration conf) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilder.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilder.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilder.java
new file mode 100644
index 0000000..a8603a4
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilder.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.time.FastDateFormat;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.conf.ComponentConfiguration;
+
+import java.util.TimeZone;
+
+/**
+ * Default index name builder. It prepares name of index using configured
+ * prefix and current timestamp. Default format of name is prefix-yyyy-MM-dd".
+ */
+public class TimeBasedIndexNameBuilder implements
+ IndexNameBuilder {
+
+ public static final String DATE_FORMAT = "dateFormat";
+ public static final String TIME_ZONE = "timeZone";
+
+ public static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd";
+ public static final String DEFAULT_TIME_ZONE = "Etc/UTC";
+
+ private FastDateFormat fastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd",
+ TimeZone.getTimeZone("Etc/UTC"));
+
+ private String indexPrefix;
+
+ @VisibleForTesting
+ FastDateFormat getFastDateFormat() {
+ return fastDateFormat;
+ }
+
+ /**
+ * Gets the name of the index to use for an index request
+ * @param event
+ * Event for which the name of index has to be prepared
+ * @return index name of the form 'indexPrefix-formattedTimestamp'
+ */
+ @Override
+ public String getIndexName(Event event) {
+ TimestampedEvent timestampedEvent = new TimestampedEvent(event);
+ long timestamp = timestampedEvent.getTimestamp();
+ return new StringBuilder(indexPrefix).append('-')
+ .append(fastDateFormat.format(timestamp)).toString();
+ }
+
+ @Override
+ public String getIndexPrefix(Event event) {
+ return indexPrefix;
+ }
+
+ @Override
+ public void configure(Context context) {
+ String dateFormatString = context.getString(DATE_FORMAT);
+ String timeZoneString = context.getString(TIME_ZONE);
+ if (StringUtils.isBlank(dateFormatString)) {
+ dateFormatString = DEFAULT_DATE_FORMAT;
+ }
+ if (StringUtils.isBlank(timeZoneString)) {
+ timeZoneString = DEFAULT_TIME_ZONE;
+ }
+ fastDateFormat = FastDateFormat.getInstance(dateFormatString,
+ TimeZone.getTimeZone(timeZoneString));
+ indexPrefix = context.getString(ElasticSearchSinkConstants.INDEX_NAME);
+ }
+
+ @Override
+ public void configure(ComponentConfiguration conf) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimestampedEvent.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimestampedEvent.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimestampedEvent.java
new file mode 100644
index 0000000..c056839
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimestampedEvent.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.Event;
+import org.apache.flume.event.SimpleEvent;
+import org.joda.time.DateTimeUtils;
+
+import java.util.Map;
+
+/**
+ * {@link org.apache.flume.Event} implementation that has a timestamp.
+ * The timestamp is taken from (in order of precedence):<ol>
+ * <li>The "timestamp" header of the base event, if present</li>
+ * <li>The "@timestamp" header of the base event, if present</li>
+ * <li>The current time in millis, otherwise</li>
+ * </ol>
+ */
+final class TimestampedEvent extends SimpleEvent {
+
+ private final long timestamp;
+
+ TimestampedEvent(Event base) {
+ setBody(base.getBody());
+ Map<String, String> headers = Maps.newHashMap(base.getHeaders());
+ String timestampString = headers.get("timestamp");
+ if (StringUtils.isBlank(timestampString)) {
+ timestampString = headers.get("@timestamp");
+ }
+ if (StringUtils.isBlank(timestampString)) {
+ this.timestamp = DateTimeUtils.currentTimeMillis();
+ headers.put("timestamp", String.valueOf(timestamp ));
+ } else {
+ this.timestamp = Long.valueOf(timestampString);
+ }
+ setHeaders(headers);
+ }
+
+ long getTimestamp() {
+ return timestamp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClient.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClient.java
new file mode 100644
index 0000000..655e00a
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClient.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch.client;
+
+import org.apache.flume.Event;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.sink.elasticsearch.IndexNameBuilder;
+
+/**
+ * Interface for an ElasticSearch client which is responsible for sending bulks
+ * of events to ElasticSearch.
+ */
+public interface ElasticSearchClient extends Configurable {
+
+ /**
+ * Close connection to elastic search in client
+ */
+ void close();
+
+ /**
+ * Add new event to the bulk
+ *
+ * @param event
+ * Flume Event
+ * @param indexNameBuilder
+ * Index name builder which generates name of index to feed
+ * @param indexType
+ * Name of type of document which will be sent to the elasticsearch cluster
+ * @param ttlMs
+ * Time to live expressed in milliseconds. Value <= 0 is ignored
+ * @throws Exception
+ */
+ public void addEvent(Event event, IndexNameBuilder indexNameBuilder,
+ String indexType, long ttlMs) throws Exception;
+
+ /**
+ * Sends bulk to the elasticsearch cluster
+ *
+ * @throws Exception
+ */
+ void execute() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClientFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClientFactory.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClientFactory.java
new file mode 100644
index 0000000..873157a
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClientFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch.client;
+
+import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer;
+import org.apache.flume.sink.elasticsearch.ElasticSearchIndexRequestBuilderFactory;
+
+/**
+ * Internal ElasticSearch client factory. Responsible for creating instance
+ * of ElasticSearch clients.
+ */
+public class ElasticSearchClientFactory {
+ public static final String TransportClient = "transport";
+ public static final String RestClient = "rest";
+
+ /**
+ *
+ * @param clientType
+ * String representation of client type
+ * @param hostNames
+ * Array of strings that represents hosntames with ports (hostname:port)
+ * @param clusterName
+ * Elasticsearch cluster name used only by Transport Client
+ * @param serializer
+ * Serializer of flume events to elasticsearch documents
+ * @return
+ */
+ public ElasticSearchClient getClient(String clientType, String[] hostNames,
+ String clusterName, ElasticSearchEventSerializer serializer,
+ ElasticSearchIndexRequestBuilderFactory indexBuilder) throws NoSuchClientTypeException {
+ if (clientType.equalsIgnoreCase(TransportClient) && serializer != null) {
+ return new ElasticSearchTransportClient(hostNames, clusterName, serializer);
+ } else if (clientType.equalsIgnoreCase(TransportClient) && indexBuilder != null) {
+ return new ElasticSearchTransportClient(hostNames, clusterName, indexBuilder);
+ } else if (clientType.equalsIgnoreCase(RestClient) && serializer != null) {
+ return new ElasticSearchRestClient(hostNames, serializer);
+ }
+ throw new NoSuchClientTypeException();
+ }
+
+ /**
+ * Used for tests only. Creates local elasticsearch instance client.
+ *
+ * @param clientType Name of client to use
+ * @param serializer Serializer for the event
+ * @param indexBuilder Index builder factory
+ *
+ * @return Local elastic search instance client
+ */
+ public ElasticSearchClient getLocalClient(String clientType, ElasticSearchEventSerializer serializer,
+ ElasticSearchIndexRequestBuilderFactory indexBuilder) throws NoSuchClientTypeException {
+ if (clientType.equalsIgnoreCase(TransportClient) && serializer != null) {
+ return new ElasticSearchTransportClient(serializer);
+ } else if (clientType.equalsIgnoreCase(TransportClient) && indexBuilder != null) {
+ return new ElasticSearchTransportClient(indexBuilder);
+ } else if (clientType.equalsIgnoreCase(RestClient)) {
+ }
+ throw new NoSuchClientTypeException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java
new file mode 100644
index 0000000..ff95e30
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch.client;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.gson.Gson;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer;
+import org.apache.flume.sink.elasticsearch.IndexNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.util.EntityUtils;
+import org.elasticsearch.common.bytes.BytesReference;
+
+/**
+ * Rest ElasticSearch client which is responsible for sending bulks of events to
+ * ElasticSearch using ElasticSearch HTTP API. This is configurable, so any
+ * config params required should be taken through this.
+ */
+public class ElasticSearchRestClient implements ElasticSearchClient {
+
+ private static final String INDEX_OPERATION_NAME = "index";
+ private static final String INDEX_PARAM = "_index";
+ private static final String TYPE_PARAM = "_type";
+ private static final String TTL_PARAM = "_ttl";
+ private static final String BULK_ENDPOINT = "_bulk";
+
+ private static final Logger logger = LoggerFactory.getLogger(ElasticSearchRestClient.class);
+
+ private final ElasticSearchEventSerializer serializer;
+ private final RoundRobinList<String> serversList;
+
+ private StringBuilder bulkBuilder;
+ private HttpClient httpClient;
+
+ public ElasticSearchRestClient(String[] hostNames,
+ ElasticSearchEventSerializer serializer) {
+
+ for (int i = 0; i < hostNames.length; ++i) {
+ if (!hostNames[i].contains("http://") && !hostNames[i].contains("https://")) {
+ hostNames[i] = "http://" + hostNames[i];
+ }
+ }
+ this.serializer = serializer;
+
+ serversList = new RoundRobinList<String>(Arrays.asList(hostNames));
+ httpClient = new DefaultHttpClient();
+ bulkBuilder = new StringBuilder();
+ }
+
+ @VisibleForTesting
+ public ElasticSearchRestClient(String[] hostNames,
+ ElasticSearchEventSerializer serializer, HttpClient client) {
+ this(hostNames, serializer);
+ httpClient = client;
+ }
+
+ @Override
+ public void configure(Context context) {
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public void addEvent(Event event, IndexNameBuilder indexNameBuilder, String indexType, long ttlMs) throws Exception {
+ BytesReference content = serializer.getContentBuilder(event).bytes();
+ Map<String, Map<String, String>> parameters = new HashMap<String, Map<String, String>>();
+ Map<String, String> indexParameters = new HashMap<String, String>();
+ indexParameters.put(INDEX_PARAM, indexNameBuilder.getIndexName(event));
+ indexParameters.put(TYPE_PARAM, indexType);
+ if (ttlMs > 0) {
+ indexParameters.put(TTL_PARAM, Long.toString(ttlMs));
+ }
+ parameters.put(INDEX_OPERATION_NAME, indexParameters);
+
+ Gson gson = new Gson();
+ synchronized(bulkBuilder) {
+ bulkBuilder.append(gson.toJson(parameters));
+ bulkBuilder.append("\n");
+ bulkBuilder.append(content.toBytesArray().toUtf8());
+ bulkBuilder.append("\n");
+ }
+ }
+
+ @Override
+ public void execute() throws Exception {
+ int statusCode = 0, triesCount = 0;
+ HttpResponse response = null;
+ logger.info("Sending bulk request to elasticsearch cluster");
+
+ String entity;
+ synchronized (bulkBuilder) {
+ entity = bulkBuilder.toString();
+ bulkBuilder = new StringBuilder();
+ }
+
+ while (statusCode != HttpStatus.SC_OK && triesCount < serversList.size()) {
+ triesCount++;
+ String host = serversList.get();
+ String url = host + "/" + BULK_ENDPOINT;
+ HttpPost httpRequest = new HttpPost(url);
+ httpRequest.setEntity(new StringEntity(entity));
+ response = httpClient.execute(httpRequest);
+ statusCode = response.getStatusLine().getStatusCode();
+ logger.info("Status code from elasticsearch: " + statusCode);
+ if (response.getEntity() != null)
+ logger.debug("Status message from elasticsearch: " + EntityUtils.toString(response.getEntity(), "UTF-8"));
+ }
+
+ if (statusCode != HttpStatus.SC_OK) {
+ if (response.getEntity() != null) {
+ throw new EventDeliveryException(EntityUtils.toString(response.getEntity(), "UTF-8"));
+ } else {
+ throw new EventDeliveryException("Elasticsearch status code was: " + statusCode);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchTransportClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchTransportClient.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchTransportClient.java
new file mode 100644
index 0000000..e9ed0b4
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchTransportClient.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch.client;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer;
+import org.apache.flume.sink.elasticsearch.IndexNameBuilder;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.node.NodeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import org.apache.flume.sink.elasticsearch.ElasticSearchIndexRequestBuilderFactory;
+
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_PORT;
+
+public class ElasticSearchTransportClient implements ElasticSearchClient {
+
+ public static final Logger logger = LoggerFactory
+ .getLogger(ElasticSearchTransportClient.class);
+
+ private InetSocketTransportAddress[] serverAddresses;
+ private ElasticSearchEventSerializer serializer;
+ private ElasticSearchIndexRequestBuilderFactory indexRequestBuilderFactory;
+ private BulkRequestBuilder bulkRequestBuilder;
+
+ private Client client;
+
+ @VisibleForTesting
+ InetSocketTransportAddress[] getServerAddresses() {
+ return serverAddresses;
+ }
+
+ @VisibleForTesting
+ void setBulkRequestBuilder(BulkRequestBuilder bulkRequestBuilder) {
+ this.bulkRequestBuilder = bulkRequestBuilder;
+ }
+
+ /**
+ * Transport client for external cluster
+ *
+ * @param hostNames
+ * @param clusterName
+ * @param serializer
+ */
+ public ElasticSearchTransportClient(String[] hostNames, String clusterName,
+ ElasticSearchEventSerializer serializer) {
+ configureHostnames(hostNames);
+ this.serializer = serializer;
+ openClient(clusterName);
+ }
+
+ public ElasticSearchTransportClient(String[] hostNames, String clusterName,
+ ElasticSearchIndexRequestBuilderFactory indexBuilder) {
+ configureHostnames(hostNames);
+ this.indexRequestBuilderFactory = indexBuilder;
+ openClient(clusterName);
+ }
+
+ /**
+ * Local transport client only for testing
+ *
+ * @param indexBuilderFactory
+ */
+ public ElasticSearchTransportClient(ElasticSearchIndexRequestBuilderFactory indexBuilderFactory) {
+ this.indexRequestBuilderFactory = indexBuilderFactory;
+ openLocalDiscoveryClient();
+ }
+
+ /**
+ * Local transport client only for testing
+ *
+ * @param serializer
+ */
+ public ElasticSearchTransportClient(ElasticSearchEventSerializer serializer) {
+ this.serializer = serializer;
+ openLocalDiscoveryClient();
+ }
+
+ /**
+ * Used for testing
+ *
+ * @param client
+ * ElasticSearch Client
+ * @param serializer
+ * Event Serializer
+ */
+ public ElasticSearchTransportClient(Client client,
+ ElasticSearchEventSerializer serializer) {
+ this.client = client;
+ this.serializer = serializer;
+ }
+
+ /**
+ * Used for testing
+ *
+ * @param client ElasticSearch Client
+ * @param serializer Event Serializer
+ */
+ public ElasticSearchTransportClient(Client client,
+ ElasticSearchIndexRequestBuilderFactory requestBuilderFactory) throws IOException {
+ this.client = client;
+ requestBuilderFactory.createIndexRequest(client, null, null, null);
+ }
+
+ private void configureHostnames(String[] hostNames) {
+ logger.warn(Arrays.toString(hostNames));
+ serverAddresses = new InetSocketTransportAddress[hostNames.length];
+ for (int i = 0; i < hostNames.length; i++) {
+ String[] hostPort = hostNames[i].trim().split(":");
+ String host = hostPort[0].trim();
+ int port = hostPort.length == 2 ? Integer.parseInt(hostPort[1].trim())
+ : DEFAULT_PORT;
+ serverAddresses[i] = new InetSocketTransportAddress(host, port);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (client != null) {
+ client.close();
+ }
+ client = null;
+ }
+
+ @Override
+ public void addEvent(Event event, IndexNameBuilder indexNameBuilder,
+ String indexType, long ttlMs) throws Exception {
+ if (bulkRequestBuilder == null) {
+ bulkRequestBuilder = client.prepareBulk();
+ }
+
+ IndexRequestBuilder indexRequestBuilder = null;
+ if (indexRequestBuilderFactory == null) {
+ indexRequestBuilder = client
+ .prepareIndex(indexNameBuilder.getIndexName(event), indexType)
+ .setSource(serializer.getContentBuilder(event).bytes());
+ } else {
+ indexRequestBuilder = indexRequestBuilderFactory.createIndexRequest(
+ client, indexNameBuilder.getIndexPrefix(event), indexType, event);
+ }
+
+ if (ttlMs > 0) {
+ indexRequestBuilder.setTTL(ttlMs);
+ }
+ bulkRequestBuilder.add(indexRequestBuilder);
+ }
+
+ @Override
+ public void execute() throws Exception {
+ try {
+ logger.info("Sending bulk to elasticsearch cluster");
+ BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();
+ if (bulkResponse.hasFailures()) {
+ throw new EventDeliveryException(bulkResponse.buildFailureMessage());
+ }
+ } finally {
+ bulkRequestBuilder = client.prepareBulk();
+ }
+ }
+
+ /**
+ * Open client to elaticsearch cluster
+ *
+ * @param clusterName
+ */
+ private void openClient(String clusterName) {
+ logger.info("Using ElasticSearch hostnames: {} ",
+ Arrays.toString(serverAddresses));
+ Settings settings = ImmutableSettings.settingsBuilder()
+ .put("cluster.name", clusterName).build();
+
+ TransportClient transportClient = new TransportClient(settings);
+ for (InetSocketTransportAddress host : serverAddresses) {
+ transportClient.addTransportAddress(host);
+ }
+ if (client != null) {
+ client.close();
+ }
+ client = transportClient;
+ }
+
+ /*
+ * FOR TESTING ONLY...
+ *
+ * Opens a local discovery node for talking to an elasticsearch server running
+ * in the same JVM
+ */
+ private void openLocalDiscoveryClient() {
+ logger.info("Using ElasticSearch AutoDiscovery mode");
+ Node node = NodeBuilder.nodeBuilder().client(true).local(true).node();
+ if (client != null) {
+ client.close();
+ }
+ client = node.client();
+ }
+
+ @Override
+ public void configure(Context context) {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/NoSuchClientTypeException.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/NoSuchClientTypeException.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/NoSuchClientTypeException.java
new file mode 100644
index 0000000..41fbe0d
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/NoSuchClientTypeException.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2014 Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.elasticsearch.client;
+
+/**
+ * Exception class
+ */
+class NoSuchClientTypeException extends Exception {
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/RoundRobinList.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/RoundRobinList.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/RoundRobinList.java
new file mode 100644
index 0000000..dbad8d8
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/RoundRobinList.java
@@ -0,0 +1,44 @@
+package org.apache.flume.sink.elasticsearch.client;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+/*
+ * Copyright 2014 Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class RoundRobinList<T> {
+
+ private Iterator<T> iterator;
+ private final Collection<T> elements;
+
+ public RoundRobinList(Collection<T> elements) {
+ this.elements = elements;
+ iterator = this.elements.iterator();
+ }
+
+ synchronized public T get() {
+ if (iterator.hasNext()) {
+ return iterator.next();
+ } else {
+ iterator = elements.iterator();
+ return iterator.next();
+ }
+ }
+
+ public int size() {
+ return elements.size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchDynamicSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchDynamicSerializer.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchDynamicSerializer.java
index 43a4b12..d4e4654 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchDynamicSerializer.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchDynamicSerializer.java
@@ -18,12 +18,6 @@
*/
package org.apache.flume.sink.elasticsearch;
-import static org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer.charset;
-import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
-import static org.junit.Assert.assertEquals;
-
-import java.util.Map;
-
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.event.EventBuilder;
@@ -31,6 +25,12 @@ import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.junit.Test;
+import java.util.Map;
+
+import static org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer.charset;
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.junit.Assert.assertEquals;
+
public class TestElasticSearchDynamicSerializer {
@Test
http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java
index 1e4e119..807a9c7 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java
@@ -168,27 +168,27 @@ public class TestElasticSearchIndexRequestBuilderFactory
assertTrue(serializer.configuredWithComponentConfiguration);
}
-}
-
-class FakeEventSerializer implements ElasticSearchEventSerializer {
-
- static final byte[] FAKE_BYTES = new byte[] {9,8,7,6};
- boolean configuredWithContext, configuredWithComponentConfiguration;
-
- @Override
- public BytesStream getContentBuilder(Event event) throws IOException {
- FastByteArrayOutputStream fbaos = new FastByteArrayOutputStream(4);
- fbaos.write(FAKE_BYTES);
- return fbaos;
+ static class FakeEventSerializer implements ElasticSearchEventSerializer {
+
+ static final byte[] FAKE_BYTES = new byte[]{9, 8, 7, 6};
+ boolean configuredWithContext, configuredWithComponentConfiguration;
+
+ @Override
+ public BytesStream getContentBuilder(Event event) throws IOException {
+ FastByteArrayOutputStream fbaos = new FastByteArrayOutputStream(4);
+ fbaos.write(FAKE_BYTES);
+ return fbaos;
+ }
+
+ @Override
+ public void configure(Context arg0) {
+ configuredWithContext = true;
+ }
+
+ @Override
+ public void configure(ComponentConfiguration arg0) {
+ configuredWithComponentConfiguration = true;
+ }
}
- @Override
- public void configure(Context arg0) {
- configuredWithContext = true;
- }
-
- @Override
- public void configure(ComponentConfiguration arg0) {
- configuredWithComponentConfiguration = true;
- }
}
http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java
index 9dff4b0..d2c9543 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java
@@ -18,13 +18,6 @@
*/
package org.apache.flume.sink.elasticsearch;
-import static org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer.charset;
-import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
-import static org.junit.Assert.assertEquals;
-
-import java.util.Date;
-import java.util.Map;
-
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.event.EventBuilder;
@@ -32,6 +25,13 @@ import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.junit.Test;
+import java.util.Date;
+import java.util.Map;
+
+import static org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer.charset;
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.junit.Assert.assertEquals;
+
public class TestElasticSearchLogStashEventSerializer {
@Test
http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
index 71789e8..15546c1 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
@@ -20,7 +20,6 @@ package org.apache.flume.sink.elasticsearch;
import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.BATCH_SIZE;
import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLUSTER_NAME;
-import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_PORT;
import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.HOSTNAMES;
import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME;
import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_TYPE;
@@ -29,14 +28,15 @@ import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
import java.io.IOException;
import java.util.Map;
import java.util.HashMap;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
-
import org.apache.commons.lang.time.FastDateFormat;
+
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
@@ -49,7 +49,8 @@ import org.apache.flume.event.EventBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.UUID;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.io.BytesStream;
+import org.elasticsearch.common.io.FastByteArrayOutputStream;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -169,8 +170,7 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest {
fixture = new ElasticSearchSink();
fixture.configure(new Context(parameters));
- InetSocketTransportAddress[] expected = { new InetSocketTransportAddress(
- "10.5.5.27", DEFAULT_PORT) };
+ String[] expected = { "10.5.5.27" };
assertEquals("testing-cluster-name", fixture.getClusterName());
assertEquals("testing-index-name", fixture.getIndexName());
@@ -189,8 +189,7 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest {
fixture = new ElasticSearchSink();
fixture.configure(new Context(parameters));
- InetSocketTransportAddress[] expected = { new InetSocketTransportAddress(
- "10.5.5.27", DEFAULT_PORT) };
+ String[] expected = { "10.5.5.27" };
assertEquals(DEFAULT_INDEX_NAME, fixture.getIndexName());
assertEquals(DEFAULT_INDEX_TYPE, fixture.getIndexType());
@@ -205,10 +204,7 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest {
fixture = new ElasticSearchSink();
fixture.configure(new Context(parameters));
- InetSocketTransportAddress[] expected = {
- new InetSocketTransportAddress("10.5.5.27", DEFAULT_PORT),
- new InetSocketTransportAddress("10.5.5.28", DEFAULT_PORT),
- new InetSocketTransportAddress("10.5.5.29", DEFAULT_PORT) };
+ String[] expected = { "10.5.5.27", "10.5.5.28", "10.5.5.29" };
assertArrayEquals(expected, fixture.getServerAddresses());
}
@@ -220,10 +216,7 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest {
fixture = new ElasticSearchSink();
fixture.configure(new Context(parameters));
- InetSocketTransportAddress[] expected = {
- new InetSocketTransportAddress("10.5.5.27", DEFAULT_PORT),
- new InetSocketTransportAddress("10.5.5.28", DEFAULT_PORT),
- new InetSocketTransportAddress("10.5.5.29", DEFAULT_PORT) };
+ String[] expected = { "10.5.5.27", "10.5.5.28", "10.5.5.29" };
assertArrayEquals(expected, fixture.getServerAddresses());
}
@@ -235,25 +228,20 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest {
fixture = new ElasticSearchSink();
fixture.configure(new Context(parameters));
- InetSocketTransportAddress[] expected = {
- new InetSocketTransportAddress("10.5.5.27", 9300),
- new InetSocketTransportAddress("10.5.5.28", 9301),
- new InetSocketTransportAddress("10.5.5.29", 9302) };
+ String[] expected = { "10.5.5.27:9300", "10.5.5.28:9301", "10.5.5.29:9302" };
assertArrayEquals(expected, fixture.getServerAddresses());
}
@Test
public void shouldParseMultipleHostAndPortsWithWhitespaces() {
- parameters.put(HOSTNAMES, " 10.5.5.27 : 9300 , 10.5.5.28 : 9301 , 10.5.5.29 : 9302 ");
+ parameters.put(HOSTNAMES,
+ " 10.5.5.27 : 9300 , 10.5.5.28 : 9301 , 10.5.5.29 : 9302 ");
fixture = new ElasticSearchSink();
fixture.configure(new Context(parameters));
- InetSocketTransportAddress[] expected = {
- new InetSocketTransportAddress("10.5.5.27", 9300),
- new InetSocketTransportAddress("10.5.5.28", 9301),
- new InetSocketTransportAddress("10.5.5.29", 9302) };
+ String[] expected = { "10.5.5.27:9300", "10.5.5.28:9301", "10.5.5.29:9302" };
assertArrayEquals(expected, fixture.getServerAddresses());
}
@@ -261,11 +249,10 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest {
@Test
public void shouldAllowCustomElasticSearchIndexRequestBuilderFactory()
throws Exception {
-
parameters.put(SERIALIZER,
CustomElasticSearchIndexRequestBuilderFactory.class.getName());
- Configurables.configure(fixture, new Context(parameters));
+ fixture.configure(new Context(parameters));
Channel channel = bindAndStartChannel(fixture);
Transaction tx = channel.getTransaction();
@@ -279,7 +266,7 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest {
fixture.process();
fixture.stop();
- assertEquals(fixture.getIndexName()+"-05_17_36_789",
+ assertEquals(fixture.getIndexName() + "-05_17_36_789",
CustomElasticSearchIndexRequestBuilderFactory.actualIndexName);
assertEquals(fixture.getIndexType(),
CustomElasticSearchIndexRequestBuilderFactory.actualIndexType);
@@ -289,7 +276,7 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest {
}
@Test
- public void shouldParseFullyQualifiedTTLs(){
+ public void shouldParseFullyQualifiedTTLs() {
Map<String, Long> testTTLMap = new HashMap<String, Long>();
testTTLMap.put("1ms", Long.valueOf(1));
testTTLMap.put("1s", Long.valueOf(1000));
@@ -297,7 +284,7 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest {
testTTLMap.put("1h", Long.valueOf(3600000));
testTTLMap.put("1d", Long.valueOf(86400000));
testTTLMap.put("1w", Long.valueOf(604800000));
- testTTLMap.put("1", Long.valueOf(86400000));
+ testTTLMap.put("1", Long.valueOf(86400000));
parameters.put(HOSTNAMES, "10.5.5.27");
parameters.put(CLUSTER_NAME, "testing-cluster-name");
@@ -309,13 +296,10 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest {
fixture = new ElasticSearchSink();
fixture.configure(new Context(parameters));
- InetSocketTransportAddress[] expected = {new InetSocketTransportAddress(
- "10.5.5.27", DEFAULT_PORT)};
-
+ String[] expected = { "10.5.5.27" };
assertEquals("testing-cluster-name", fixture.getClusterName());
assertEquals("testing-index-name", fixture.getIndexName());
assertEquals("testing-index-type", fixture.getIndexType());
- System.out.println("TTL MS" + Long.toString(testTTLMap.get(ttl)));
assertEquals((long) testTTLMap.get(ttl), fixture.getTTLMs());
assertArrayEquals(expected, fixture.getServerAddresses());
@@ -374,10 +358,84 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest {
}
}
+ @Test
+ public void shouldUseSpecifiedSerializer() throws Exception {
+ Context context = new Context();
+ context.put(SERIALIZER,
+ "org.apache.flume.sink.elasticsearch.FakeEventSerializer");
+
+ assertNull(fixture.getEventSerializer());
+ fixture.configure(context);
+ assertTrue(fixture.getEventSerializer() instanceof FakeEventSerializer);
+ }
+
+ @Test
+ public void shouldUseSpecifiedIndexNameBuilder() throws Exception {
+ Context context = new Context();
+ context.put(ElasticSearchSinkConstants.INDEX_NAME_BUILDER,
+ "org.apache.flume.sink.elasticsearch.FakeIndexNameBuilder");
+
+ assertNull(fixture.getIndexNameBuilder());
+ fixture.configure(context);
+ assertTrue(fixture.getIndexNameBuilder() instanceof FakeIndexNameBuilder);
+ }
+
public static class FakeConfigurable implements Configurable {
@Override
public void configure(Context arg0) {
- // no-op
+ // no-op
}
}
}
+
+/**
+ * Internal class. Fake event serializer used for tests
+ */
+class FakeEventSerializer implements ElasticSearchEventSerializer {
+
+ static final byte[] FAKE_BYTES = new byte[] { 9, 8, 7, 6 };
+ boolean configuredWithContext, configuredWithComponentConfiguration;
+
+ @Override
+ public BytesStream getContentBuilder(Event event) throws IOException {
+ FastByteArrayOutputStream fbaos = new FastByteArrayOutputStream(4);
+ fbaos.write(FAKE_BYTES);
+ return fbaos;
+ }
+
+ @Override
+ public void configure(Context arg0) {
+ configuredWithContext = true;
+ }
+
+ @Override
+ public void configure(ComponentConfiguration arg0) {
+ configuredWithComponentConfiguration = true;
+ }
+}
+
+/**
+ * Internal class. Fake index name builder used only for tests.
+ */
+class FakeIndexNameBuilder implements IndexNameBuilder {
+
+ static final String INDEX_NAME = "index_name";
+
+ @Override
+ public String getIndexName(Event event) {
+ return INDEX_NAME;
+ }
+
+ @Override
+ public String getIndexPrefix(Event event) {
+ return INDEX_NAME;
+ }
+
+ @Override
+ public void configure(Context context) {
+ }
+
+ @Override
+ public void configure(ComponentConfiguration conf) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/e12f0a7a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilderTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilderTest.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilderTest.java
new file mode 100644
index 0000000..678342a
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilderTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.elasticsearch;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.event.SimpleEvent;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class TimeBasedIndexNameBuilderTest {
+
+ private TimeBasedIndexNameBuilder indexNameBuilder;
+
+ @Before
+ public void setUp() throws Exception {
+ Context context = new Context();
+ context.put(ElasticSearchSinkConstants.INDEX_NAME, "prefix");
+ indexNameBuilder = new TimeBasedIndexNameBuilder();
+ indexNameBuilder.configure(context);
+ }
+
+ @Test
+ public void shouldUseUtcAsBasisForDateFormat() {
+ assertEquals("Coordinated Universal Time",
+ indexNameBuilder.getFastDateFormat().getTimeZone().getDisplayName());
+ }
+
+ @Test
+ public void indexNameShouldBePrefixDashFormattedTimestamp() {
+ long time = 987654321L;
+ Event event = new SimpleEvent();
+ Map<String, String> headers = new HashMap<String, String>();
+ headers.put("timestamp", Long.toString(time));
+ event.setHeaders(headers);
+ assertEquals("prefix-" + indexNameBuilder.getFastDateFormat().format(time),
+ indexNameBuilder.getIndexName(event));
+ }
+}