You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/09/01 05:31:44 UTC
[2/2] incubator-gobblin git commit: [GOBBLIN-17] Add Elasticsearch
writer (rest + transport)
[GOBBLIN-17] Add Elasticsearch writer (rest + transport)
Closes #2419 from shirshanka/elastic
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f1bc746c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f1bc746c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f1bc746c
Branch: refs/heads/master
Commit: f1bc746ca50cffa1247c00b6c5bdd34b7321198d
Parents: ef438c8
Author: Shirshanka Das <sd...@linkedin.com>
Authored: Fri Aug 31 22:31:21 2018 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Fri Aug 31 22:31:45 2018 -0700
----------------------------------------------------------------------
.../gobblin/writer/AsyncWriterManager.java | 3 +
.../java/org/apache/gobblin/writer/Batch.java | 22 +-
.../gobblin/writer/BufferedAsyncDataWriter.java | 4 +-
.../gobblin/writer/BytesBoundedBatch.java | 10 +-
.../gobblin/writer/LargeMessagePolicy.java | 26 ++
.../gobblin/writer/RecordTooLargeException.java | 20 ++
.../writer/SequentialBasedBatchAccumulator.java | 65 +++--
.../gobblin-flavor-standard.gradle | 1 +
.../src/main/resources/wikipedia-elastic.conf | 64 +++++
.../gobblin-elasticsearch-deps/build.gradle | 50 ++++
.../gobblin-elasticsearch/build.gradle | 76 ++++++
.../scripts/install_test_deps.sh | 40 +++
.../scripts/uninstall_test_deps.sh | 23 ++
.../AvroGenericRecordSerializer.java | 80 ++++++
.../AvroGenericRecordTypeMapper.java | 71 ++++++
.../typemapping/FieldMappingException.java | 35 +++
.../typemapping/GsonJsonSerializer.java | 52 ++++
.../typemapping/JsonSerializer.java | 30 +++
.../typemapping/JsonTypeMapper.java | 56 +++++
.../typemapping/SerializationException.java | 31 +++
.../elasticsearch/typemapping/TypeMapper.java | 36 +++
.../writer/ElasticsearchDataWriterBuilder.java | 83 +++++++
.../writer/ElasticsearchRestWriter.java | 232 ++++++++++++++++++
.../ElasticsearchTransportClientWriter.java | 118 +++++++++
.../writer/ElasticsearchWriterBase.java | 168 +++++++++++++
.../ElasticsearchWriterConfigurationKeys.java | 71 ++++++
.../elasticsearch/writer/ExceptionLogger.java | 26 ++
.../writer/FutureCallbackHolder.java | 193 +++++++++++++++
.../writer/MalformedDocPolicy.java | 26 ++
.../elasticsearch/ElasticsearchTestServer.java | 217 +++++++++++++++++
.../ElasticsearchTestServerTest.java | 50 ++++
.../elasticsearch/writer/ConfigBuilder.java | 72 ++++++
.../ElasticsearchTransportClientWriterTest.java | 54 +++++
.../writer/ElasticsearchWriterBaseTest.java | 113 +++++++++
.../ElasticsearchWriterIntegrationTest.java | 243 +++++++++++++++++++
.../elasticsearch/writer/RestWriterVariant.java | 97 ++++++++
.../elasticsearch/writer/TestClient.java | 37 +++
.../writer/TransportWriterVariant.java | 96 ++++++++
.../elasticsearch/writer/WriterVariant.java | 40 +++
.../gobblin/test/AvroRecordGenerator.java | 104 ++++++++
.../gobblin/test/JsonRecordGenerator.java | 75 ++++++
.../org/apache/gobblin/test/PayloadType.java | 27 +++
.../gobblin/test/RecordTypeGenerator.java | 43 ++++
.../eventhub/writer/EventhubBatchTest.java | 35 +--
.../java/org/apache/gobblin/test/TestUtils.java | 21 ++
gradle/scripts/globalDependencies.gradle | 30 +--
46 files changed, 3005 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-core-base/src/main/java/org/apache/gobblin/writer/AsyncWriterManager.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/AsyncWriterManager.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/AsyncWriterManager.java
index 2be89c6..a599753 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/AsyncWriterManager.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/AsyncWriterManager.java
@@ -340,6 +340,8 @@ public class AsyncWriterManager<D> implements WatermarkAwareWriter<D>, DataWrite
.update(currTime - attempt.getPrevAttemptTimestampNanos(), TimeUnit.NANOSECONDS);
}
if (attempt.attemptNum <= AsyncWriterManager.this.numRetries) { // attempts must == numRetries + 1
+ log.debug("Attempt {} had failure: {}; re-enqueueing record: {}", attempt.attemptNum, throwable.getMessage(),
+ attempt.getRecord().toString());
attempt.incAttempt();
attempt.setPrevAttemptFailure(throwable);
AsyncWriterManager.this.retryQueue.get().add(attempt);
@@ -391,6 +393,7 @@ public class AsyncWriterManager<D> implements WatermarkAwareWriter<D>, DataWrite
Attempt attempt = this.retryQueue.take();
if (attempt != null) {
maybeSleep(attempt.getPrevAttemptTimestampNanos());
+ log.debug("Retry thread will retry record: {}", attempt.getRecord().toString());
attemptWrite(attempt);
}
} catch (InterruptedException e) {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-core-base/src/main/java/org/apache/gobblin/writer/Batch.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/Batch.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/Batch.java
index ff16590..faf815c 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/Batch.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/Batch.java
@@ -127,17 +127,18 @@ public abstract class Batch<D>{
/**
* A method to check if the batch has the room to add a new record
* @param record: record needs to be added
+ * @param largeMessagePolicy: the policy that is in effect for large messages
* @return Indicates if this batch still have enough space to hold a new record
*/
- public abstract boolean hasRoom (D record);
+ public abstract boolean hasRoom (D record, LargeMessagePolicy largeMessagePolicy);
/**
* Add a record to this batch
* <p>
* Implementation of this method should always ensure the record can be added successfully
- * The contract between {@link Batch#tryAppend(Object, WriteCallback)} and this method is this method
+ * The contract between {@link Batch#tryAppend(Object, WriteCallback, LargeMessagePolicy)} and this method is this method
* is responsible for adding record to internal batch memory and the check for the room space is performed
- * by {@link Batch#hasRoom(Object)}. All the potential issues for adding a record should
+ * by {@link Batch#hasRoom(Object, LargeMessagePolicy)}. All the potential issues for adding a record should
* already be resolved before this method is invoked.
* </p>
*
@@ -162,14 +163,19 @@ public abstract class Batch<D>{
*
* @param record : record needs to be added
* @param callback : A callback which will be invoked when the whole batch gets sent and acknowledged
+ * @param largeMessagePolicy : the {@link LargeMessagePolicy} that is in effect for this batch
* @return A future object which contains {@link RecordMetadata}
*/
- public Future<RecordMetadata> tryAppend(D record, WriteCallback callback) {
- if (!hasRoom(record)) {
- LOG.debug ("Cannot add " + record + " to previous batch because the batch already has " + getCurrentSizeInByte() + " bytes");
+ public Future<RecordMetadata> tryAppend(D record, WriteCallback callback, LargeMessagePolicy largeMessagePolicy)
+ throws RecordTooLargeException {
+ if (!hasRoom(record, largeMessagePolicy)) {
+ LOG.debug ("Cannot add {} to previous batch because the batch already has {} bytes",
+ record.toString(), getCurrentSizeInByte());
+ if (largeMessagePolicy == LargeMessagePolicy.FAIL) {
+ throw new RecordTooLargeException();
+ }
return null;
}
-
this.append(record);
thunks.add(new Thunk(callback, getRecordSizeInByte(record)));
RecordFuture future = new RecordFuture(latch, recordCount);
@@ -178,7 +184,9 @@ public abstract class Batch<D>{
}
public void await() throws InterruptedException{
+ LOG.debug("Batch {} waiting for {} records", this.id, this.recordCount);
this.latch.await();
+ LOG.debug("Batch {} done with {} records", this.id, this.recordCount);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BufferedAsyncDataWriter.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BufferedAsyncDataWriter.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BufferedAsyncDataWriter.java
index ceaffec..87039b6 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BufferedAsyncDataWriter.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BufferedAsyncDataWriter.java
@@ -39,7 +39,7 @@ import org.apache.gobblin.annotation.Alpha;
* @param <D> data record type
*/
@Alpha
-public abstract class BufferedAsyncDataWriter<D> implements AsyncDataWriter<D> {
+public class BufferedAsyncDataWriter<D> implements AsyncDataWriter<D> {
private RecordProcessor<D> processor;
private BatchAccumulator<D> accumulator;
@@ -136,7 +136,7 @@ public abstract class BufferedAsyncDataWriter<D> implements AsyncDataWriter<D> {
return new WriteCallback<Object>() {
@Override
public void onSuccess(WriteResponse writeResponse) {
- LOG.info ("Batch " + batch.getId() + " is on success with size " + batch.getCurrentSizeInByte() + " num of record " + batch.getRecords().size());
+ LOG.debug ("Batch " + batch.getId() + " is on success with size " + batch.getCurrentSizeInByte() + " num of record " + batch.getRecords().size());
batch.onSuccess(writeResponse);
batch.done();
accumulator.deallocate(batch);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BytesBoundedBatch.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BytesBoundedBatch.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BytesBoundedBatch.java
index ef63882..7b6b4dc 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BytesBoundedBatch.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BytesBoundedBatch.java
@@ -62,7 +62,11 @@ public class BytesBoundedBatch<D> extends Batch<D>{
records.add(record);
}
- boolean hasRoom (D record) {
+ boolean hasRoom (D record, LargeMessagePolicy largeMessagePolicy) {
+ if (records.isEmpty() && largeMessagePolicy == LargeMessagePolicy.ATTEMPT) {
+ // there is always space for one record, no matter how big :)
+ return true;
+ }
long recordLen = BytesBoundedBatch.this.getInternalSize(record);
return (byteSize + recordLen) <= BytesBoundedBatch.this.memSizeLimit;
}
@@ -80,8 +84,8 @@ public class BytesBoundedBatch<D> extends Batch<D>{
return memory.getRecords();
}
- public boolean hasRoom (D object) {
- return memory.hasRoom(object);
+ public boolean hasRoom (D object, LargeMessagePolicy largeMessagePolicy) {
+ return memory.hasRoom(object, largeMessagePolicy);
}
public void append (D object) {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-core-base/src/main/java/org/apache/gobblin/writer/LargeMessagePolicy.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/LargeMessagePolicy.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/LargeMessagePolicy.java
new file mode 100644
index 0000000..28ca949
--- /dev/null
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/LargeMessagePolicy.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.writer;
+
+/**
+ * Describes how single messages that are larger than a batch message limit should be treated
+ */
+public enum LargeMessagePolicy {
+ DROP, // drop (and log) messages that exceed the threshold
+ ATTEMPT, // attempt to deliver messages that exceed the threshold
+ FAIL // throw an error when this happens
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-core-base/src/main/java/org/apache/gobblin/writer/RecordTooLargeException.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/RecordTooLargeException.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/RecordTooLargeException.java
new file mode 100644
index 0000000..845e6a8
--- /dev/null
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/RecordTooLargeException.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.writer;
+
+public class RecordTooLargeException extends Exception {
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-core-base/src/main/java/org/apache/gobblin/writer/SequentialBasedBatchAccumulator.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/SequentialBasedBatchAccumulator.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/SequentialBasedBatchAccumulator.java
index 9b7a608..58b0942 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/SequentialBasedBatchAccumulator.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/SequentialBasedBatchAccumulator.java
@@ -43,14 +43,16 @@ import org.apache.gobblin.util.ConfigUtils;
* keeps in the deque until a TTL is expired.
*/
-public abstract class SequentialBasedBatchAccumulator<D> extends BatchAccumulator<D> {
+public class SequentialBasedBatchAccumulator<D> extends BatchAccumulator<D> {
+ private static final LargeMessagePolicy DEFAULT_LARGE_MESSAGE_POLICY = LargeMessagePolicy.FAIL;
private Deque<BytesBoundedBatch<D>> dq = new LinkedList<>();
private IncompleteRecordBatches incomplete = new IncompleteRecordBatches();
private final long batchSizeLimit;
private final long memSizeLimit;
private final double tolerance = 0.95;
private final long expireInMilliSecond;
+ private final LargeMessagePolicy largeMessagePolicy;
private static final Logger LOG = LoggerFactory.getLogger(SequentialBasedBatchAccumulator.class);
private final ReentrantLock dqLock = new ReentrantLock();
@@ -63,24 +65,31 @@ public abstract class SequentialBasedBatchAccumulator<D> extends BatchAccumulato
}
public SequentialBasedBatchAccumulator(Properties properties) {
- Config config = ConfigUtils.propertiesToConfig(properties);
- this.batchSizeLimit = ConfigUtils.getLong(config, Batch.BATCH_SIZE,
- Batch.BATCH_SIZE_DEFAULT);
-
- this.expireInMilliSecond = ConfigUtils.getLong(config, Batch.BATCH_TTL,
- Batch.BATCH_TTL_DEFAULT);
-
- this.capacity = ConfigUtils.getLong(config, Batch.BATCH_QUEUE_CAPACITY,
- Batch.BATCH_QUEUE_CAPACITY_DEFAULT);
+ this(ConfigUtils.propertiesToConfig(properties));
+ }
- this.memSizeLimit = (long) (this.tolerance * this.batchSizeLimit);
+ public SequentialBasedBatchAccumulator(Config config) {
+ this(ConfigUtils.getLong(config, Batch.BATCH_SIZE,
+ Batch.BATCH_SIZE_DEFAULT),
+ ConfigUtils.getLong(config, Batch.BATCH_TTL,
+ Batch.BATCH_TTL_DEFAULT),
+ ConfigUtils.getLong(config, Batch.BATCH_QUEUE_CAPACITY,
+ Batch.BATCH_QUEUE_CAPACITY_DEFAULT));
}
public SequentialBasedBatchAccumulator(long batchSizeLimit, long expireInMilliSecond, long capacity) {
+ this(batchSizeLimit, expireInMilliSecond, capacity, DEFAULT_LARGE_MESSAGE_POLICY);
+ }
+
+ public SequentialBasedBatchAccumulator(long batchSizeLimit,
+ long expireInMilliSecond,
+ long capacity,
+ LargeMessagePolicy largeMessagePolicy) {
this.batchSizeLimit = batchSizeLimit;
this.expireInMilliSecond = expireInMilliSecond;
this.capacity = capacity;
this.memSizeLimit = (long) (this.tolerance * this.batchSizeLimit);
+ this.largeMessagePolicy = largeMessagePolicy;
}
public long getNumOfBatches () {
@@ -101,7 +110,12 @@ public abstract class SequentialBasedBatchAccumulator<D> extends BatchAccumulato
try {
BytesBoundedBatch last = dq.peekLast();
if (last != null) {
- Future<RecordMetadata> future = last.tryAppend(record, callback);
+ Future<RecordMetadata> future = null;
+ try {
+ future = last.tryAppend(record, callback, this.largeMessagePolicy);
+ } catch (RecordTooLargeException e) {
+ // Ok if the record was too large for the current batch
+ }
if (future != null) {
return future;
}
@@ -110,12 +124,18 @@ public abstract class SequentialBasedBatchAccumulator<D> extends BatchAccumulato
// Create a new batch because previous one has no space
BytesBoundedBatch batch = new BytesBoundedBatch(this.memSizeLimit, this.expireInMilliSecond);
LOG.debug("Batch " + batch.getId() + " is generated");
- Future<RecordMetadata> future = batch.tryAppend(record, callback);
+ Future<RecordMetadata> future = null;
+ try {
+ future = batch.tryAppend(record, callback, this.largeMessagePolicy);
+ } catch (RecordTooLargeException e) {
+ // If a new batch also wasn't able to accomodate the new message
+ throw new RuntimeException("Failed due to a message that was too large", e);
+ }
- // Even single record can exceed the batch size limit
- // Ignore the record because Eventhub can only accept payload less than 256KB
+ // The future might be null, since the largeMessagePolicy might be set to DROP
if (future == null) {
- LOG.error("Batch " + batch.getId() + " is marked as complete because it contains a huge record: "
+ assert largeMessagePolicy.equals(LargeMessagePolicy.DROP);
+ LOG.error("Batch " + batch.getId() + " is silently marked as complete, dropping a huge record: "
+ record);
future = Futures.immediateFuture(new RecordMetadata(0));
callback.onSuccess(WriteResponse.EMPTY);
@@ -124,6 +144,7 @@ public abstract class SequentialBasedBatchAccumulator<D> extends BatchAccumulato
// if queue is full, we should not add more
while (dq.size() >= this.capacity) {
+ LOG.debug("Accumulator size {} is greater than capacity {}, waiting", dq.size(), this.capacity);
this.notFull.await();
}
dq.addLast(batch);
@@ -187,7 +208,7 @@ public abstract class SequentialBasedBatchAccumulator<D> extends BatchAccumulato
return dq.poll();
} else {
while (dq.size() == 0) {
- LOG.info ("ready to sleep because of queue is empty");
+ LOG.debug ("ready to sleep because of queue is empty");
SequentialBasedBatchAccumulator.this.notEmpty.await();
if (SequentialBasedBatchAccumulator.this.isClosed()) {
return dq.poll();
@@ -203,7 +224,7 @@ public abstract class SequentialBasedBatchAccumulator<D> extends BatchAccumulato
if (dq.size() == 1) {
if (dq.peekFirst().isTTLExpire()) {
- LOG.info ("Batch " + dq.peekFirst().getId() + " is expired");
+ LOG.debug ("Batch " + dq.peekFirst().getId() + " is expired");
BytesBoundedBatch candidate = dq.poll();
SequentialBasedBatchAccumulator.this.notFull.signal();
return candidate;
@@ -240,12 +261,16 @@ public abstract class SequentialBasedBatchAccumulator<D> extends BatchAccumulato
public void flush() {
try {
ArrayList<Batch> batches = this.incomplete.all();
- LOG.info ("flush on {} batches", batches.size());
+ int numOutstandingRecords = 0;
+ for (Batch batch: batches) {
+ numOutstandingRecords += batch.getRecords().size();
+ }
+ LOG.debug ("Flush called on {} batches with {} records total", batches.size(), numOutstandingRecords);
for (Batch batch: batches) {
batch.await();
}
} catch (Exception e) {
- LOG.info ("Error happens when flushing");
+ LOG.error ("Error happened while flushing batches");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-distribution/gobblin-flavor-standard.gradle
----------------------------------------------------------------------
diff --git a/gobblin-distribution/gobblin-flavor-standard.gradle b/gobblin-distribution/gobblin-flavor-standard.gradle
index c2061a5..2f544ca 100644
--- a/gobblin-distribution/gobblin-flavor-standard.gradle
+++ b/gobblin-distribution/gobblin-flavor-standard.gradle
@@ -21,4 +21,5 @@ dependencies {
compile project(':gobblin-modules:gobblin-crypto-provider')
compile project(':gobblin-modules:gobblin-kafka-08')
compile project(':gobblin-modules:google-ingestion')
+ compile project(':gobblin-modules:gobblin-elasticsearch')
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-example/src/main/resources/wikipedia-elastic.conf
----------------------------------------------------------------------
diff --git a/gobblin-example/src/main/resources/wikipedia-elastic.conf b/gobblin-example/src/main/resources/wikipedia-elastic.conf
new file mode 100644
index 0000000..9db386e
--- /dev/null
+++ b/gobblin-example/src/main/resources/wikipedia-elastic.conf
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# A sample pull file that copies an input Kafka topic and produces to an output Kafka topic
+# with sampling
+job {
+ name=PullFromWikipediaToElasticSearch
+ group=Wikipedia
+ description=Pull from Wikipedia and write to ElasticSearch
+}
+
+task.maxretries=0
+
+source {
+ class=org.apache.gobblin.example.wikipedia.WikipediaSource
+ page.titles="Wikipedia:Sandbox"
+ revisions.cnt=5
+}
+
+wikipedia {
+ api.rooturl="https://en.wikipedia.org/w/api.php"
+ avro.schema="{\"namespace\": \"example.wikipedia.avro\",\"type\": \"record\",\"name\": \"WikipediaArticle\",\"fields\": [{\"name\": \"revid\", \"type\": [\"double\", \"null\"]},{\"name\": \"pageid\", \"type\": [\"double\", \"null\"]},{\"name\": \"title\", \"type\": [\"string\", \"null\"]},{\"name\": \"user\", \"type\": [\"string\", \"null\"]},{\"name\": \"anon\", \"type\": [\"string\", \"null\"]},{\"name\": \"userid\", \"type\": [\"double\", \"null\"]},{\"name\": \"timestamp\", \"type\": [\"string\", \"null\"]},{\"name\": \"size\", \"type\": [\"double\", \"null\"]},{\"name\": \"contentformat\", \"type\": [\"string\", \"null\"]},{\"name\": \"contentmodel\", \"type\": [\"string\", \"null\"]},{\"name\": \"content\", \"type\": [\"string\", \"null\"]}]}"
+}
+converter.classes=org.apache.gobblin.example.wikipedia.WikipediaConverter
+extract.namespace=org.apache.gobblin.example.wikipedia
+
+writer {
+ builder.class=org.apache.gobblin.elasticsearch.writer.ElasticsearchDataWriterBuilder
+ elasticsearch {
+ client.type=REST
+ index.name=wikipedia-test
+ index.type=docs
+ #hosts=hostname
+ #ssl {
+ # enabled=true
+ # keystoreType=pkcs12
+ # keystorePassword=change_me
+ # keystoreLocation=/path/to/.p12 file
+ # truststoreType=jks
+ # truststoreLocation=/path/to/cacerts
+ # truststorePassword=changeme
+ #}
+ typeMapperClass=org.apache.gobblin.elasticsearch.typemapping.AvroGenericRecordTypeMapper
+ useIdFromData=false # change to true if you want to use a field from the record as the id field
+ #idFieldName=id # change to the field of the record that you want to use as the id of the document
+ }
+}
+
+data.publisher.type=org.apache.gobblin.publisher.NoopPublisher
+
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch-deps/build.gradle
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch-deps/build.gradle b/gobblin-modules/gobblin-elasticsearch-deps/build.gradle
new file mode 100644
index 0000000..35e9a36
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch-deps/build.gradle
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+buildscript {
+ repositories {
+ jcenter()
+ }
+ dependencies {
+ classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4'
+ }
+}
+
+apply plugin: 'com.github.johnrengelman.shadow'
+apply plugin: 'java'
+
+dependencies {
+ compile "org.elasticsearch.client:transport:5.6.8"
+ compile "org.elasticsearch.client:elasticsearch-rest-high-level-client:5.6.8"
+ compile "com.google.guava:guava:18.0"
+}
+
+
+configurations {
+ compile {
+ exclude group: "org.apache.hadoop"
+ exclude group: "com.sun.jersey.contribs"
+ }
+}
+
+shadowJar {
+ zip64 true
+ relocate 'com.google.common', 'shadow.gobblin.elasticsearch.com.google.common'
+}
+
+ext.classification="library"
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/build.gradle
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/build.gradle b/gobblin-modules/gobblin-elasticsearch/build.gradle
new file mode 100644
index 0000000..2d624b2
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/build.gradle
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: 'java'
+
+dependencies {
+ compile project(":gobblin-api")
+ compile project(":gobblin-core-base")
+ compile project(":gobblin-utility")
+ compile project(":gobblin-metrics-libs:gobblin-metrics")
+ compile project(path: ":gobblin-modules:gobblin-elasticsearch-deps", configuration:"shadow")
+
+ compile "org.apache.logging.log4j:log4j-to-slf4j:2.7"
+ compile "org.slf4j:slf4j-api:1.7.21"
+ compile externalDependency.avro
+ compile externalDependency.jacksonCore
+ compile externalDependency.jacksonMapper
+ compile externalDependency.commonsHttpClient
+ compile externalDependency.commonsPool
+ compile externalDependency.commonsLang3
+ compile externalDependency.slf4j
+ compile externalDependency.httpclient
+ compile externalDependency.httpcore
+ compile externalDependency.lombok
+ compile externalDependency.metricsCore
+ compile externalDependency.typesafeConfig
+ compile externalDependency.findBugsAnnotations
+
+ testCompile project(":gobblin-runtime")
+ testCompile project(":gobblin-test-utils")
+ testCompile externalDependency.jsonAssert
+ testCompile externalDependency.mockito
+ testCompile externalDependency.testng
+}
+
+task installTestDependencies(type:Exec) {
+ workingDir "${project.rootDir}/gobblin-modules/gobblin-elasticsearch/"
+ commandLine './scripts/install_test_deps.sh'
+}
+
+task uninstallTestDependencies(type: Exec) {
+ workingDir "${project.rootDir}/gobblin-modules/gobblin-elasticsearch/"
+ commandLine './scripts/uninstall_test_deps.sh'
+
+}
+
+test.dependsOn installTestDependencies
+test.finalizedBy uninstallTestDependencies
+
+configurations {
+ compile {
+ transitive = false
+ }
+}
+
+test {
+ workingDir rootProject.rootDir
+ maxParallelForks = 4
+}
+
+
+ext.classification="library"
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/scripts/install_test_deps.sh
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/scripts/install_test_deps.sh b/gobblin-modules/gobblin-elasticsearch/scripts/install_test_deps.sh
new file mode 100755
index 0000000..48324da
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/scripts/install_test_deps.sh
@@ -0,0 +1,40 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+TARGET_DIR="test-elasticsearch"
+ES_VERSION=5.6.8
+ES_DIR=${TARGET_DIR}/elasticsearch-${ES_VERSION}
+echo ${TARGET_DIR}
+mkdir -p ${TARGET_DIR}
+
+
+ES_TAR=${TARGET_DIR}/elasticsearch-${ES_VERSION}.tar.gz
+ES_URL=https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-${ES_VERSION}.tar.gz
+echo ${ES_URL}
+echo ${ES_TAR}
+if [ -d $ES_DIR ];
+then
+ echo "Skipping download since version already found at ${ES_DIR}"
+ echo "Cleaning up directory"
+ rm -rf ${TARGET_DIR}/elasticsearch-${ES_VERSION}
+else
+ echo "$ES_DIR does not exist, downloading"
+ curl -o ${ES_TAR} ${ES_URL}
+fi
+tar -xzf ${ES_TAR} -C ${TARGET_DIR}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/scripts/uninstall_test_deps.sh
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/scripts/uninstall_test_deps.sh b/gobblin-modules/gobblin-elasticsearch/scripts/uninstall_test_deps.sh
new file mode 100755
index 0000000..db79f86
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/scripts/uninstall_test_deps.sh
@@ -0,0 +1,23 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+TARGET_DIR="test-elasticsearch"
+ES_VERSION=5.6.8
+ES_DIR=${TARGET_DIR}/elasticsearch-${ES_VERSION}
+rm -rf ${TARGET_DIR}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordSerializer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordSerializer.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordSerializer.java
new file mode 100644
index 0000000..5242202
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordSerializer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.typemapping;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.io.output.ByteArrayOutputStream;
+
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * A {@link JsonSerializer} for {@link GenericRecord} objects.
+ */
+@Slf4j
+public class AvroGenericRecordSerializer implements JsonSerializer<GenericRecord> {
+
+ private final ByteArrayOutputStream byteArrayOutputStream;
+ private final DataOutputStream out;
+ private final GenericDatumWriter<GenericRecord> writer;
+ private final Closer closer;
+
+
+ public AvroGenericRecordSerializer() {
+ this.closer =Closer.create();
+ this.byteArrayOutputStream = new ByteArrayOutputStream();
+ this.out = this.closer.register(new DataOutputStream(this.byteArrayOutputStream));
+ this.writer = new GenericDatumWriter<GenericRecord>();
+ }
+
+ @Override
+ public void configure(Config config) {
+
+ }
+
+ @Override
+ public synchronized byte[] serializeToJson(GenericRecord serializable)
+ throws SerializationException {
+ try {
+ /**
+ * We use the toString method of Avro to flatten the JSON for optional nullable types.
+ * Otherwise the JSON has an additional level of nesting to encode the type.
+ * e.g. "id": {"string": "id-value"} versus "id": "id-value"
+ * See {@link: https://issues.apache.org/jira/browse/AVRO-1582} for a good discussion on this.
+ */
+ String serialized = serializable.toString();
+ return serialized.getBytes(Charset.forName("UTF-8"));
+
+ } catch (Exception exception) {
+ throw new SerializationException("Could not serializeToJson Avro record", exception);
+ }
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ this.closer.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordTypeMapper.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordTypeMapper.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordTypeMapper.java
new file mode 100644
index 0000000..0586f3c
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordTypeMapper.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.typemapping;
+
+import java.io.IOException;
+
+import org.apache.avro.generic.GenericRecord;
+
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * A TypeMapper for Avro GenericRecords.
+ */
+@Slf4j
+public class AvroGenericRecordTypeMapper implements TypeMapper<GenericRecord> {
+
+ private final JsonSerializer<GenericRecord> serializer;
+ private final Closer closer;
+
+ public AvroGenericRecordTypeMapper() {
+ this.closer =Closer.create();
+ this.serializer = this.closer.register(new AvroGenericRecordSerializer());
+ }
+
+ @Override
+ public void configure(Config config) {
+ this.serializer.configure(config);
+ log.info("AvroGenericRecordTypeMapper successfully configured");
+ }
+
+ @Override
+ public JsonSerializer<GenericRecord> getSerializer() {
+ return this.serializer;
+ }
+
+ @Override
+ public String getValue(String fieldName, GenericRecord record)
+ throws FieldMappingException {
+ try {
+ Object idValue = record.get(fieldName);
+ return idValue.toString();
+ }
+ catch (Exception e) {
+ throw new FieldMappingException("Could not find field " + fieldName, e);
+ }
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ this.closer.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/FieldMappingException.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/FieldMappingException.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/FieldMappingException.java
new file mode 100644
index 0000000..781f918
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/FieldMappingException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.typemapping;
+
+/**
+ * An exception for type mapping errors during field-based access
+ */
+public class FieldMappingException extends Exception {
+
+ public FieldMappingException(Exception e) {
+ super(e);
+ }
+
+ public FieldMappingException(String message, Exception e) {
+ super(message, e);
+ }
+
+ public FieldMappingException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/GsonJsonSerializer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/GsonJsonSerializer.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/GsonJsonSerializer.java
new file mode 100644
index 0000000..d44986c
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/GsonJsonSerializer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.typemapping;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+import com.google.gson.Gson;
+import com.typesafe.config.Config;
+
+
+/**
+ * A Gson based Json Serializer
+ */
+public class GsonJsonSerializer implements JsonSerializer<Object> {
+ private final Gson _gson = new Gson();
+
+ @Override
+ public void configure(Config config) {
+
+ }
+
+ @Override
+ public byte[] serializeToJson(Object serializable)
+ throws SerializationException {
+ String jsonString = _gson.toJson(serializable);
+ try {
+ return jsonString.getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new SerializationException(e);
+ }
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonSerializer.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonSerializer.java
new file mode 100644
index 0000000..41f2885
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonSerializer.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.typemapping;
+
+import java.io.Closeable;
+
+import com.typesafe.config.Config;
+
+
+public interface JsonSerializer<T> extends Closeable {
+
+ void configure(Config config);
+
+ byte[] serializeToJson(T serializable) throws SerializationException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonTypeMapper.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonTypeMapper.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonTypeMapper.java
new file mode 100644
index 0000000..8491147
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonTypeMapper.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.typemapping;
+
+import java.io.IOException;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.typesafe.config.Config;
+
+
+public class JsonTypeMapper implements TypeMapper<JsonElement> {
+
+ private final JsonSerializer serializer = new GsonJsonSerializer();
+ @Override
+ public void configure(Config config) {
+
+ }
+
+ @Override
+ public JsonSerializer<JsonElement> getSerializer() {
+ return serializer;
+ }
+
+ @Override
+ public String getValue(String fieldName, JsonElement record)
+ throws FieldMappingException {
+ assert record.isJsonObject();
+ JsonObject jsonObject = record.getAsJsonObject();
+ if (jsonObject.has(fieldName)) {
+ return jsonObject.get(fieldName).getAsString();
+ } else {
+ throw new FieldMappingException("Could not find field :" + fieldName);
+ }
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/SerializationException.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/SerializationException.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/SerializationException.java
new file mode 100644
index 0000000..d2edb53
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/SerializationException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.typemapping;
+
+/**
+ * A class to hold exceptions thrown by {@link JsonSerializer}s.
+ */
+public class SerializationException extends Exception {
+ public SerializationException(Exception e) {
+ super(e);
+ }
+
+ public SerializationException(String s, Exception exception) {
+ super(s, exception);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/TypeMapper.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/TypeMapper.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/TypeMapper.java
new file mode 100644
index 0000000..5aa909b
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/TypeMapper.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.typemapping;
+
+import java.io.Closeable;
+
+import com.typesafe.config.Config;
+
+
+/**
+ * An interface that enables the ElasticSearch writer to work with different types of records.
+ * Supports serialization and id-getter capabilities
+ */
+public interface TypeMapper<T> extends Closeable {
+
+ void configure(Config config);
+
+ JsonSerializer<T> getSerializer();
+
+ String getValue(String fieldName, T record) throws FieldMappingException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchDataWriterBuilder.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchDataWriterBuilder.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchDataWriterBuilder.java
new file mode 100644
index 0000000..cb6ed15
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchDataWriterBuilder.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.writer;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.writer.AsyncWriterManager;
+import org.apache.gobblin.writer.BatchAsyncDataWriter;
+import org.apache.gobblin.writer.BufferedAsyncDataWriter;
+import org.apache.gobblin.writer.DataWriter;
+import org.apache.gobblin.writer.DataWriterBuilder;
+import org.apache.gobblin.writer.SequentialBasedBatchAccumulator;
+
+import com.google.gson.JsonObject;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.configuration.State;
+
+public class ElasticsearchDataWriterBuilder extends DataWriterBuilder {
+
+ @Override
+ public DataWriter build() throws IOException {
+
+ State state = this.destination.getProperties();
+ Properties taskProps = state.getProperties();
+ Config config = ConfigUtils.propertiesToConfig(taskProps);
+
+ SequentialBasedBatchAccumulator<JsonObject> batchAccumulator = new SequentialBasedBatchAccumulator<>(taskProps);
+
+ BatchAsyncDataWriter asyncDataWriter;
+ switch (ElasticsearchWriterConfigurationKeys.ClientType.valueOf(
+ ConfigUtils.getString(config,
+ ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_CLIENT_TYPE,
+ ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_CLIENT_TYPE_DEFAULT).toUpperCase())) {
+ case REST: {
+ asyncDataWriter = new ElasticsearchRestWriter(config);
+ break;
+ }
+ case TRANSPORT: {
+ asyncDataWriter = new ElasticsearchTransportClientWriter(config);
+ break;
+ }
+ default: {
+ throw new IllegalArgumentException("Need to specify which "
+ + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_CLIENT_TYPE
+ + " client to use (rest/transport)");
+ }
+ }
+ BufferedAsyncDataWriter bufferedAsyncDataWriter = new BufferedAsyncDataWriter(batchAccumulator, asyncDataWriter);
+
+ double failureAllowance = ConfigUtils.getDouble(config, ElasticsearchWriterConfigurationKeys.FAILURE_ALLOWANCE_PCT_CONFIG,
+ ElasticsearchWriterConfigurationKeys.FAILURE_ALLOWANCE_PCT_DEFAULT) / 100.0;
+ boolean retriesEnabled = ConfigUtils.getBoolean(config, ElasticsearchWriterConfigurationKeys.RETRIES_ENABLED,
+ ElasticsearchWriterConfigurationKeys.RETRIES_ENABLED_DEFAULT);
+ int maxRetries = ConfigUtils.getInt(config, ElasticsearchWriterConfigurationKeys.MAX_RETRIES,
+ ElasticsearchWriterConfigurationKeys.MAX_RETRIES_DEFAULT);
+
+
+ return AsyncWriterManager.builder()
+ .failureAllowanceRatio(failureAllowance)
+ .retriesEnabled(retriesEnabled)
+ .numRetries(maxRetries)
+ .config(config)
+ .asyncDataWriter(bufferedAsyncDataWriter)
+ .build();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchRestWriter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchRestWriter.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchRestWriter.java
new file mode 100644
index 0000000..7cd77da
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchRestWriter.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.elasticsearch.writer;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.security.KeyStore;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.math3.util.Pair;
+import org.apache.gobblin.password.PasswordManager;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.writer.Batch;
+import org.apache.gobblin.writer.BatchAsyncDataWriter;
+import org.apache.gobblin.writer.GenericWriteResponse;
+import org.apache.gobblin.writer.WriteCallback;
+import org.apache.gobblin.writer.WriteResponse;
+import org.apache.http.HttpHost;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.impl.nio.reactor.IOReactorConfig;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.SSLContexts;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.typesafe.config.Config;
+
+import javax.annotation.Nullable;
+import javax.net.ssl.SSLContext;
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+public class ElasticsearchRestWriter extends ElasticsearchWriterBase implements BatchAsyncDataWriter<Object> {
+
+ private final RestHighLevelClient client;
+ private final RestClient lowLevelClient;
+
+ ElasticsearchRestWriter(Config config)
+ throws IOException {
+ super(config);
+
+
+ int threadCount = ConfigUtils.getInt(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_CLIENT_THREADPOOL_SIZE,
+ ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_CLIENT_THREADPOOL_DEFAULT);
+ try {
+
+ PasswordManager passwordManager = PasswordManager.getInstance();
+ Boolean sslEnabled = ConfigUtils.getBoolean(config,
+ ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_ENABLED,
+ ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_ENABLED_DEFAULT);
+ if (sslEnabled) {
+
+ // keystore
+ String keyStoreType = ConfigUtils
+ .getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_KEYSTORE_TYPE,
+ ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_KEYSTORE_TYPE_DEFAULT);
+ String keyStoreFilePassword = passwordManager.readPassword(ConfigUtils
+ .getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_KEYSTORE_PASSWORD, ""));
+ String identityFilepath = ConfigUtils
+ .getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_KEYSTORE_LOCATION, "");
+
+ // truststore
+ String trustStoreType = ConfigUtils
+ .getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_TYPE,
+ ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_TYPE_DEFAULT);
+ String trustStoreFilePassword = passwordManager.readPassword(ConfigUtils
+ .getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_PASSWORD, ""));
+ String cacertsFilepath = ConfigUtils
+ .getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_LOCATION, "");
+ String truststoreAbsolutePath = Paths.get(cacertsFilepath).toAbsolutePath().normalize().toString();
+ log.info("Truststore absolutePath is:" + truststoreAbsolutePath);
+
+
+ this.lowLevelClient =
+ buildRestClient(this.hostAddresses, threadCount, true, keyStoreType, keyStoreFilePassword, identityFilepath,
+ trustStoreType, trustStoreFilePassword, cacertsFilepath);
+ }
+ else {
+ this.lowLevelClient = buildRestClient(this.hostAddresses, threadCount);
+ }
+ client = new RestHighLevelClient(this.lowLevelClient);
+
+ log.info("Elasticsearch Rest Writer configured successfully with: indexName={}, "
+ + "indexType={}, idMappingEnabled={}, typeMapperClassName={}, ssl={}",
+ this.indexName, this.indexType, this.idMappingEnabled, this.typeMapper.getClass().getCanonicalName(),
+ sslEnabled);
+
+ } catch (Exception e) {
+ throw new IOException("Failed to instantiate rest elasticsearch client", e);
+ }
+ }
+
+ @Override
+ int getDefaultPort() {
+ return ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_REST_WRITER_DEFAULT_PORT;
+ }
+
+
+ private static RestClient buildRestClient(List<InetSocketTransportAddress> hosts, int threadCount)
+ throws Exception {
+ return buildRestClient(hosts, threadCount, false, null, null, null, null, null, null);
+ }
+
+
+ //TODO: Support pass through of configuration (e.g. timeouts etc) of rest client from above
+ private static RestClient buildRestClient(List<InetSocketTransportAddress> hosts, int threadCount, boolean sslEnabled,
+ String keyStoreType, String keyStoreFilePassword, String identityFilepath, String trustStoreType,
+ String trustStoreFilePassword, String cacertsFilepath) throws Exception {
+
+
+ HttpHost[] httpHosts = new HttpHost[hosts.size()];
+ String scheme = sslEnabled?"https":"http";
+ for (int h = 0; h < httpHosts.length; h++) {
+ InetSocketTransportAddress host = hosts.get(h);
+ httpHosts[h] = new HttpHost(host.getAddress(), host.getPort(), scheme);
+ }
+
+ RestClientBuilder builder = RestClient.builder(httpHosts);
+
+ if (sslEnabled) {
+ log.info("ssl configuration: trustStoreType = {}, cacertsFilePath = {}", trustStoreType, cacertsFilepath);
+ KeyStore truststore = KeyStore.getInstance(trustStoreType);
+ FileInputStream trustInputStream = new FileInputStream(cacertsFilepath);
+ try {
+ truststore.load(trustInputStream, trustStoreFilePassword.toCharArray());
+ }
+ finally {
+ trustInputStream.close();
+ }
+ SSLContextBuilder sslBuilder = SSLContexts.custom().loadTrustMaterial(truststore, null);
+
+ log.info("ssl key configuration: keyStoreType = {}, keyFilePath = {}", keyStoreType, identityFilepath);
+
+ KeyStore keystore = KeyStore.getInstance(keyStoreType);
+ FileInputStream keyInputStream = new FileInputStream(identityFilepath);
+ try {
+ keystore.load(keyInputStream, keyStoreFilePassword.toCharArray());
+ }
+ finally {
+ keyInputStream.close();
+ }
+ sslBuilder.loadKeyMaterial(keystore, keyStoreFilePassword.toCharArray());
+
+ final SSLContext sslContext = sslBuilder.build();
+ builder = builder.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder
+ // Set ssl context
+ .setSSLContext(sslContext).setSSLHostnameVerifier(new NoopHostnameVerifier())
+ // Configure number of threads for clients
+ .setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(threadCount).build()));
+ } else {
+ builder = builder.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder
+ // Configure number of threads for clients
+ .setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(threadCount).build()));
+ }
+
+ // Configure timeouts
+ builder.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder
+ .setConnectionRequestTimeout(0)); // Important, otherwise the client has spurious timeouts
+
+ return builder.build();
+ }
+
+ @Override
+ public Future<WriteResponse> write(final Batch<Object> batch, @Nullable WriteCallback callback) {
+
+ Pair<BulkRequest, FutureCallbackHolder> preparedBatch = this.prepareBatch(batch, callback);
+ try {
+ client.bulkAsync(preparedBatch.getFirst(), preparedBatch.getSecond().getActionListener());
+ return preparedBatch.getSecond().getFuture();
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Caught unexpected exception while calling bulkAsync API", e);
+ }
+ }
+
+
+
+ @Override
+ public void flush() throws IOException {
+
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ this.lowLevelClient.close();
+ }
+
+ @VisibleForTesting
+ public RestHighLevelClient getRestHighLevelClient() {
+ return this.client;
+ }
+
+ @VisibleForTesting
+ public RestClient getRestLowLevelClient() {
+ return this.lowLevelClient;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchTransportClientWriter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchTransportClientWriter.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchTransportClientWriter.java
new file mode 100644
index 0000000..bb26fb5
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchTransportClientWriter.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.writer;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.math3.util.Pair;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.writer.Batch;
+import org.apache.gobblin.writer.BatchAsyncDataWriter;
+import org.apache.gobblin.writer.GenericWriteResponseWrapper;
+import org.apache.gobblin.writer.WriteCallback;
+import org.apache.gobblin.writer.WriteResponse;
+import org.apache.gobblin.writer.WriteResponseFuture;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ListenableActionFuture;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.transport.client.PreBuiltTransportClient;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+class ElasticsearchTransportClientWriter extends ElasticsearchWriterBase implements BatchAsyncDataWriter<Object> {
+
+ private final TransportClient client;
+
+ ElasticsearchTransportClientWriter(Config config) throws UnknownHostException {
+ super(config);
+ // Check if ssl is being configured, throw error that transport client does not support ssl
+ Preconditions.checkArgument(!ConfigUtils.getBoolean(config,
+ ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_ENABLED, false),
+ "Transport client does not support ssl, try the Rest client instead");
+
+ this.client = createTransportClient(config);
+
+ log.info("ElasticsearchWriter configured successfully with: indexName={}, indexType={}, idMappingEnabled={}, typeMapperClassName={}",
+ this.indexName, this.indexType, this.idMappingEnabled, this.typeMapper);
+ }
+
+ @Override
+ int getDefaultPort() {
+ return ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_TRANSPORT_WRITER_DEFAULT_PORT;
+ }
+
+ @Override
+ public Future<WriteResponse> write(Batch<Object> batch, @Nullable WriteCallback callback) {
+
+ Pair<BulkRequest, FutureCallbackHolder> preparedBatch = this.prepareBatch(batch, callback);
+ client.bulk(preparedBatch.getFirst(), preparedBatch.getSecond().getActionListener());
+ return preparedBatch.getSecond().getFuture();
+
+ }
+
+ @Override
+ public void flush() throws IOException {
+ // Elasticsearch client doesn't support a flush method
+ }
+
+ @Override
+ public void close() throws IOException {
+ log.info("Got a close call in ElasticSearchTransportWriter");
+ super.close();
+ this.client.close();
+ }
+
+ @VisibleForTesting
+ TransportClient getTransportClient() {
+ return this.client;
+ }
+
+ private TransportClient createTransportClient(Config config) throws UnknownHostException {
+ TransportClient transportClient;
+
+ // Set TransportClient settings
+ Settings.Builder settingsBuilder = Settings.builder();
+ if (config.hasPath(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SETTINGS)) {
+ settingsBuilder.put(ConfigUtils.configToProperties(config,
+ ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SETTINGS));
+ }
+ settingsBuilder.put("client.transport.ignore_cluster_name",true);
+ settingsBuilder.put("client.transport.sniff", true);
+ transportClient = new PreBuiltTransportClient(settingsBuilder.build());
+ this.hostAddresses.forEach(transportClient::addTransportAddress);
+ return transportClient;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterBase.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterBase.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterBase.java
new file mode 100644
index 0000000..5238b50
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterBase.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.writer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.commons.math3.util.Pair;
+import org.apache.gobblin.elasticsearch.typemapping.JsonSerializer;
+import org.apache.gobblin.elasticsearch.typemapping.TypeMapper;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.writer.Batch;
+import org.apache.gobblin.writer.WriteCallback;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Throwables;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * A base class for different types of Elasticsearch writers
+ */
+@Slf4j
+public abstract class ElasticsearchWriterBase implements Closeable {
+ protected final String indexName;
+ protected final String indexType;
+ protected final TypeMapper typeMapper;
+ protected final JsonSerializer serializer;
+ protected final boolean idMappingEnabled;
+ protected final String idFieldName;
+ List<InetSocketTransportAddress> hostAddresses;
+ protected final MalformedDocPolicy malformedDocPolicy;
+
+ ElasticsearchWriterBase(Config config)
+ throws UnknownHostException {
+
+ this.indexName = config.getString(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_NAME);
+ Preconditions.checkNotNull(this.indexName, "Index Name not provided. Please set "
+ + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_NAME);
+ Preconditions.checkArgument(this.indexName.equals(this.indexName.toLowerCase()),
+ "Index name must be lowercase, you provided " + this.indexName);
+ this.indexType = config.getString(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_TYPE);
+ Preconditions.checkNotNull(this.indexName, "Index Type not provided. Please set "
+ + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_TYPE);
+ this.idMappingEnabled = ConfigUtils.getBoolean(config,
+ ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_ID_MAPPING_ENABLED,
+ ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_ID_MAPPING_DEFAULT);
+ this.idFieldName = ConfigUtils.getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_ID_FIELD,
+ ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_ID_FIELD_DEFAULT);
+ String typeMapperClassName = ConfigUtils.getString(config,
+ ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS,
+ ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS_DEFAULT);
+ if (typeMapperClassName.isEmpty()) {
+ throw new IllegalArgumentException(this.getClass().getCanonicalName() + " needs to be configured with "
+ + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS + " to enable type mapping");
+ }
+ try {
+ Class<?> typeMapperClass = (Class<?>) Class.forName(typeMapperClassName);
+
+ this.typeMapper = (TypeMapper) ConstructorUtils.invokeConstructor(typeMapperClass);
+ this.typeMapper.configure(config);
+ this.serializer = this.typeMapper.getSerializer();
+ } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) {
+ log.error("Failed to instantiate type-mapper from class " + typeMapperClassName, e);
+ throw Throwables.propagate(e);
+ }
+
+ this.malformedDocPolicy = MalformedDocPolicy.valueOf(ConfigUtils.getString(config,
+ ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_MALFORMED_DOC_POLICY,
+ ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_MALFORMED_DOC_POLICY_DEFAULT));
+
+ // If list is empty, connect to the default host and port
+ if (!config.hasPath(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_HOSTS)) {
+ InetSocketTransportAddress hostAddress = new InetSocketTransportAddress(
+ InetAddress.getByName(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_DEFAULT_HOST),
+ getDefaultPort());
+ this.hostAddresses = new ArrayList<>(1);
+ this.hostAddresses.add(hostAddress);
+ log.info("Adding host {} to Elasticsearch writer", hostAddress);
+ } else {
+ // Get list of hosts
+ List<String> hosts = ConfigUtils.getStringList(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_HOSTS);
+ // Add host addresses
+ Splitter hostSplitter = Splitter.on(":").trimResults();
+ this.hostAddresses = new ArrayList<>(hosts.size());
+ for (String host : hosts) {
+
+ List<String> hostSplit = hostSplitter.splitToList(host);
+ Preconditions.checkArgument(hostSplit.size() == 1 || hostSplit.size() == 2,
+ "Malformed host name for Elasticsearch writer: " + host + " host names must be of form [host] or [host]:[port]");
+
+ InetAddress hostInetAddress = InetAddress.getByName(hostSplit.get(0));
+ InetSocketTransportAddress hostAddress = null;
+
+ if (hostSplit.size() == 1) {
+ hostAddress = new InetSocketTransportAddress(hostInetAddress, this.getDefaultPort());
+ } else if (hostSplit.size() == 2) {
+ hostAddress = new InetSocketTransportAddress(hostInetAddress, Integer.parseInt(hostSplit.get(1)));
+ }
+ this.hostAddresses.add(hostAddress);
+ log.info("Adding host {} to Elasticsearch writer", hostAddress);
+ }
+ }
+ }
+
+ abstract int getDefaultPort();
+
+
+ protected Pair<BulkRequest, FutureCallbackHolder> prepareBatch(Batch<Object> batch, WriteCallback callback) {
+ BulkRequest bulkRequest = new BulkRequest();
+ final StringBuilder stringBuilder = new StringBuilder();
+ for (Object record : batch.getRecords()) {
+ try {
+ byte[] serializedBytes = this.serializer.serializeToJson(record);
+ log.debug("serialized record: {}", serializedBytes);
+ IndexRequest indexRequest = new IndexRequest(this.indexName, this.indexType)
+ .source(serializedBytes, 0, serializedBytes.length, XContentType.JSON);
+ if (this.idMappingEnabled) {
+ String id = this.typeMapper.getValue(this.idFieldName, record);
+ indexRequest.id(id);
+ stringBuilder.append(";").append(id);
+ }
+ bulkRequest.add(indexRequest);
+ }
+ catch (Exception e) {
+ log.error("Encountered exception {}", e);
+ }
+ }
+ FutureCallbackHolder futureCallbackHolder = new FutureCallbackHolder(callback,
+ exception -> log.error("Batch: {} failed on ids; {} with exception {}", batch.getId(),
+ stringBuilder.toString(), exception),
+ this.malformedDocPolicy);
+ return new Pair(bulkRequest, futureCallbackHolder);
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.serializer.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterConfigurationKeys.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterConfigurationKeys.java
new file mode 100644
index 0000000..0dad29d
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterConfigurationKeys.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.writer;
+
+import org.apache.gobblin.elasticsearch.typemapping.JsonTypeMapper;
+
+
+public class ElasticsearchWriterConfigurationKeys {
+
+ private static final String ELASTICSEARCH_WRITER_PREFIX = "writer.elasticsearch";
+
+ private static String prefix(String value) { return ELASTICSEARCH_WRITER_PREFIX + "." + value;};
+
+ public static final String ELASTICSEARCH_WRITER_SETTINGS = prefix("settings");
+ public static final String ELASTICSEARCH_WRITER_HOSTS = prefix("hosts");
+ public static final String ELASTICSEARCH_WRITER_INDEX_NAME = prefix("index.name");
+ public static final String ELASTICSEARCH_WRITER_INDEX_TYPE = prefix("index.type");
+ public static final String ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS = prefix("typeMapperClass");
+ public static final String ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS_DEFAULT = JsonTypeMapper.class.getCanonicalName();
+ public static final String ELASTICSEARCH_WRITER_ID_MAPPING_ENABLED = prefix("useIdFromData");
+ public static final Boolean ELASTICSEARCH_WRITER_ID_MAPPING_DEFAULT = false;
+ public static final String ELASTICSEARCH_WRITER_ID_FIELD = prefix("idFieldName");
+ public static final String ELASTICSEARCH_WRITER_ID_FIELD_DEFAULT = "id";
+ public static final String ELASTICSEARCH_WRITER_CLIENT_TYPE = prefix("client.type");
+ public static final String ELASTICSEARCH_WRITER_CLIENT_TYPE_DEFAULT = "REST";
+ public static final String ELASTICSEARCH_WRITER_CLIENT_THREADPOOL_SIZE = prefix("client.threadPoolSize");
+ public static final int ELASTICSEARCH_WRITER_CLIENT_THREADPOOL_DEFAULT = 5;
+ public static final String ELASTICSEARCH_WRITER_SSL_ENABLED=prefix("ssl.enabled");
+ public static final boolean ELASTICSEARCH_WRITER_SSL_ENABLED_DEFAULT=false;
+ public static final String ELASTICSEARCH_WRITER_SSL_KEYSTORE_TYPE=prefix("ssl.keystoreType");
+ public static final String ELASTICSEARCH_WRITER_SSL_KEYSTORE_TYPE_DEFAULT = "pkcs12";
+ public static final String ELASTICSEARCH_WRITER_SSL_KEYSTORE_PASSWORD=prefix("ssl.keystorePassword");
+ public static final String ELASTICSEARCH_WRITER_SSL_KEYSTORE_LOCATION=prefix("ssl.keystoreLocation");
+ public static final String ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_TYPE=prefix("ssl.truststoreType");
+ public static final String ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_TYPE_DEFAULT = "jks";
+ public static final String ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_LOCATION=prefix("ssl.truststoreLocation");
+ public static final String ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_PASSWORD=prefix("ssl.truststorePassword");
+ public static final String ELASTICSEARCH_WRITER_MALFORMED_DOC_POLICY = prefix("malformedDocPolicy");
+ public static final String ELASTICSEARCH_WRITER_MALFORMED_DOC_POLICY_DEFAULT = "FAIL";
+
+ //Async Writer Configuration
+ public static final String RETRIES_ENABLED = prefix("retriesEnabled");
+ public static final boolean RETRIES_ENABLED_DEFAULT = true;
+ public static final String MAX_RETRIES = prefix("maxRetries");
+ public static final int MAX_RETRIES_DEFAULT = 5;
+ static final String FAILURE_ALLOWANCE_PCT_CONFIG = prefix("failureAllowancePercentage");
+ static final double FAILURE_ALLOWANCE_PCT_DEFAULT = 0.0;
+
+ public enum ClientType {
+ TRANSPORT,
+ REST
+ }
+
+ public static final String ELASTICSEARCH_WRITER_DEFAULT_HOST = "localhost";
+ public static final int ELASTICSEARCH_TRANSPORT_WRITER_DEFAULT_PORT = 9300;
+ public static final int ELASTICSEARCH_REST_WRITER_DEFAULT_PORT = 9200;
+}