You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/09/01 05:31:44 UTC

[2/2] incubator-gobblin git commit: [GOBBLIN-17] Add Elasticsearch writer (rest + transport)

[GOBBLIN-17] Add Elasticsearch writer (rest + transport)

Closes #2419 from shirshanka/elastic


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f1bc746c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f1bc746c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f1bc746c

Branch: refs/heads/master
Commit: f1bc746ca50cffa1247c00b6c5bdd34b7321198d
Parents: ef438c8
Author: Shirshanka Das <sd...@linkedin.com>
Authored: Fri Aug 31 22:31:21 2018 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Fri Aug 31 22:31:45 2018 -0700

----------------------------------------------------------------------
 .../gobblin/writer/AsyncWriterManager.java      |   3 +
 .../java/org/apache/gobblin/writer/Batch.java   |  22 +-
 .../gobblin/writer/BufferedAsyncDataWriter.java |   4 +-
 .../gobblin/writer/BytesBoundedBatch.java       |  10 +-
 .../gobblin/writer/LargeMessagePolicy.java      |  26 ++
 .../gobblin/writer/RecordTooLargeException.java |  20 ++
 .../writer/SequentialBasedBatchAccumulator.java |  65 +++--
 .../gobblin-flavor-standard.gradle              |   1 +
 .../src/main/resources/wikipedia-elastic.conf   |  64 +++++
 .../gobblin-elasticsearch-deps/build.gradle     |  50 ++++
 .../gobblin-elasticsearch/build.gradle          |  76 ++++++
 .../scripts/install_test_deps.sh                |  40 +++
 .../scripts/uninstall_test_deps.sh              |  23 ++
 .../AvroGenericRecordSerializer.java            |  80 ++++++
 .../AvroGenericRecordTypeMapper.java            |  71 ++++++
 .../typemapping/FieldMappingException.java      |  35 +++
 .../typemapping/GsonJsonSerializer.java         |  52 ++++
 .../typemapping/JsonSerializer.java             |  30 +++
 .../typemapping/JsonTypeMapper.java             |  56 +++++
 .../typemapping/SerializationException.java     |  31 +++
 .../elasticsearch/typemapping/TypeMapper.java   |  36 +++
 .../writer/ElasticsearchDataWriterBuilder.java  |  83 +++++++
 .../writer/ElasticsearchRestWriter.java         | 232 ++++++++++++++++++
 .../ElasticsearchTransportClientWriter.java     | 118 +++++++++
 .../writer/ElasticsearchWriterBase.java         | 168 +++++++++++++
 .../ElasticsearchWriterConfigurationKeys.java   |  71 ++++++
 .../elasticsearch/writer/ExceptionLogger.java   |  26 ++
 .../writer/FutureCallbackHolder.java            | 193 +++++++++++++++
 .../writer/MalformedDocPolicy.java              |  26 ++
 .../elasticsearch/ElasticsearchTestServer.java  | 217 +++++++++++++++++
 .../ElasticsearchTestServerTest.java            |  50 ++++
 .../elasticsearch/writer/ConfigBuilder.java     |  72 ++++++
 .../ElasticsearchTransportClientWriterTest.java |  54 +++++
 .../writer/ElasticsearchWriterBaseTest.java     | 113 +++++++++
 .../ElasticsearchWriterIntegrationTest.java     | 243 +++++++++++++++++++
 .../elasticsearch/writer/RestWriterVariant.java |  97 ++++++++
 .../elasticsearch/writer/TestClient.java        |  37 +++
 .../writer/TransportWriterVariant.java          |  96 ++++++++
 .../elasticsearch/writer/WriterVariant.java     |  40 +++
 .../gobblin/test/AvroRecordGenerator.java       | 104 ++++++++
 .../gobblin/test/JsonRecordGenerator.java       |  75 ++++++
 .../org/apache/gobblin/test/PayloadType.java    |  27 +++
 .../gobblin/test/RecordTypeGenerator.java       |  43 ++++
 .../eventhub/writer/EventhubBatchTest.java      |  35 +--
 .../java/org/apache/gobblin/test/TestUtils.java |  21 ++
 gradle/scripts/globalDependencies.gradle        |  30 +--
 46 files changed, 3005 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-core-base/src/main/java/org/apache/gobblin/writer/AsyncWriterManager.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/AsyncWriterManager.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/AsyncWriterManager.java
index 2be89c6..a599753 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/AsyncWriterManager.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/AsyncWriterManager.java
@@ -340,6 +340,8 @@ public class AsyncWriterManager<D> implements WatermarkAwareWriter<D>, DataWrite
               .update(currTime - attempt.getPrevAttemptTimestampNanos(), TimeUnit.NANOSECONDS);
         }
         if (attempt.attemptNum <= AsyncWriterManager.this.numRetries) { // attempts must == numRetries + 1
+          log.debug("Attempt {} had failure: {}; re-enqueueing record: {}", attempt.attemptNum, throwable.getMessage(),
+              attempt.getRecord().toString());
           attempt.incAttempt();
           attempt.setPrevAttemptFailure(throwable);
           AsyncWriterManager.this.retryQueue.get().add(attempt);
@@ -391,6 +393,7 @@ public class AsyncWriterManager<D> implements WatermarkAwareWriter<D>, DataWrite
           Attempt attempt = this.retryQueue.take();
           if (attempt != null) {
             maybeSleep(attempt.getPrevAttemptTimestampNanos());
+            log.debug("Retry thread will retry record: {}", attempt.getRecord().toString());
             attemptWrite(attempt);
           }
         } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-core-base/src/main/java/org/apache/gobblin/writer/Batch.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/Batch.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/Batch.java
index ff16590..faf815c 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/Batch.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/Batch.java
@@ -127,17 +127,18 @@ public abstract class Batch<D>{
   /**
    * A method to check if the batch has the room to add a new record
    *  @param record: record needs to be added
+   *  @param largeMessagePolicy: the policy that is in effect for large messages
    *  @return Indicates if this batch still have enough space to hold a new record
    */
-  public abstract boolean hasRoom (D record);
+  public abstract boolean hasRoom (D record, LargeMessagePolicy largeMessagePolicy);
 
   /**
    * Add a record to this batch
    * <p>
    *   Implementation of this method should always ensure the record can be added successfully
-   *   The contract between {@link Batch#tryAppend(Object, WriteCallback)} and this method is this method
+   *   The contract between {@link Batch#tryAppend(Object, WriteCallback, LargeMessagePolicy)} and this method is this method
    *   is responsible for adding record to internal batch memory and the check for the room space is performed
-   *   by {@link Batch#hasRoom(Object)}. All the potential issues for adding a record should
+   *   by {@link Batch#hasRoom(Object, LargeMessagePolicy)}. All the potential issues for adding a record should
    *   already be resolved before this method is invoked.
    * </p>
    *
@@ -162,14 +163,19 @@ public abstract class Batch<D>{
    *
    *   @param record : record needs to be added
    *   @param callback : A callback which will be invoked when the whole batch gets sent and acknowledged
+   *   @param largeMessagePolicy : the {@link LargeMessagePolicy} that is in effect for this batch
    *   @return A future object which contains {@link RecordMetadata}
    */
-  public Future<RecordMetadata> tryAppend(D record, WriteCallback callback) {
-    if (!hasRoom(record)) {
-      LOG.debug ("Cannot add " + record + " to previous batch because the batch already has " + getCurrentSizeInByte() + " bytes");
+  public Future<RecordMetadata> tryAppend(D record, WriteCallback callback, LargeMessagePolicy largeMessagePolicy)
+      throws RecordTooLargeException {
+    if (!hasRoom(record, largeMessagePolicy)) {
+      LOG.debug ("Cannot add {} to previous batch because the batch already has {} bytes",
+          record.toString(), getCurrentSizeInByte());
+      if (largeMessagePolicy == LargeMessagePolicy.FAIL) {
+        throw new RecordTooLargeException();
+      }
       return null;
     }
-
     this.append(record);
     thunks.add(new Thunk(callback, getRecordSizeInByte(record)));
     RecordFuture future = new RecordFuture(latch, recordCount);
@@ -178,7 +184,9 @@ public abstract class Batch<D>{
   }
 
   public void await() throws InterruptedException{
+    LOG.debug("Batch {} waiting for {} records", this.id, this.recordCount);
     this.latch.await();
+    LOG.debug("Batch {} done with {} records", this.id, this.recordCount);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BufferedAsyncDataWriter.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BufferedAsyncDataWriter.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BufferedAsyncDataWriter.java
index ceaffec..87039b6 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BufferedAsyncDataWriter.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BufferedAsyncDataWriter.java
@@ -39,7 +39,7 @@ import org.apache.gobblin.annotation.Alpha;
  * @param <D> data record type
  */
 @Alpha
-public abstract class BufferedAsyncDataWriter<D> implements AsyncDataWriter<D> {
+public class BufferedAsyncDataWriter<D> implements AsyncDataWriter<D> {
 
   private RecordProcessor<D> processor;
   private BatchAccumulator<D> accumulator;
@@ -136,7 +136,7 @@ public abstract class BufferedAsyncDataWriter<D> implements AsyncDataWriter<D> {
       return new WriteCallback<Object>() {
         @Override
         public void onSuccess(WriteResponse writeResponse) {
-          LOG.info ("Batch " + batch.getId() + " is on success with size " + batch.getCurrentSizeInByte() + " num of record " + batch.getRecords().size());
+          LOG.debug ("Batch " + batch.getId() + " is on success with size " + batch.getCurrentSizeInByte() + " num of record " + batch.getRecords().size());
           batch.onSuccess(writeResponse);
           batch.done();
           accumulator.deallocate(batch);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BytesBoundedBatch.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BytesBoundedBatch.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BytesBoundedBatch.java
index ef63882..7b6b4dc 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BytesBoundedBatch.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BytesBoundedBatch.java
@@ -62,7 +62,11 @@ public class BytesBoundedBatch<D> extends Batch<D>{
       records.add(record);
     }
 
-    boolean hasRoom (D record) {
+    boolean hasRoom (D record, LargeMessagePolicy largeMessagePolicy) {
+      if (records.isEmpty() && largeMessagePolicy == LargeMessagePolicy.ATTEMPT) {
+        // there is always space for one record, no matter how big :)
+          return true;
+      }
       long recordLen = BytesBoundedBatch.this.getInternalSize(record);
       return (byteSize + recordLen) <= BytesBoundedBatch.this.memSizeLimit;
     }
@@ -80,8 +84,8 @@ public class BytesBoundedBatch<D> extends Batch<D>{
     return memory.getRecords();
   }
 
-  public boolean hasRoom (D object) {
-    return memory.hasRoom(object);
+  public boolean hasRoom (D object, LargeMessagePolicy largeMessagePolicy) {
+    return memory.hasRoom(object, largeMessagePolicy);
   }
 
   public void append (D object) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-core-base/src/main/java/org/apache/gobblin/writer/LargeMessagePolicy.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/LargeMessagePolicy.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/LargeMessagePolicy.java
new file mode 100644
index 0000000..28ca949
--- /dev/null
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/LargeMessagePolicy.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.writer;
+
+/**
+ * Describes how single messages that are larger than a batch message limit should be treated
+ */
+public enum LargeMessagePolicy {
+  DROP, // drop (and log) messages that exceed the threshold
+  ATTEMPT, // attempt to deliver messages that exceed the threshold
+  FAIL // throw an error when this happens
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-core-base/src/main/java/org/apache/gobblin/writer/RecordTooLargeException.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/RecordTooLargeException.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/RecordTooLargeException.java
new file mode 100644
index 0000000..845e6a8
--- /dev/null
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/RecordTooLargeException.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.writer;
+
+public class RecordTooLargeException extends Exception {
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-core-base/src/main/java/org/apache/gobblin/writer/SequentialBasedBatchAccumulator.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/SequentialBasedBatchAccumulator.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/SequentialBasedBatchAccumulator.java
index 9b7a608..58b0942 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/SequentialBasedBatchAccumulator.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/SequentialBasedBatchAccumulator.java
@@ -43,14 +43,16 @@ import org.apache.gobblin.util.ConfigUtils;
  * keeps in the deque until a TTL is expired.
  */
 
-public abstract class SequentialBasedBatchAccumulator<D> extends BatchAccumulator<D> {
+public class SequentialBasedBatchAccumulator<D> extends BatchAccumulator<D> {
 
+  private static final LargeMessagePolicy DEFAULT_LARGE_MESSAGE_POLICY = LargeMessagePolicy.FAIL;
   private Deque<BytesBoundedBatch<D>> dq = new LinkedList<>();
   private IncompleteRecordBatches incomplete = new IncompleteRecordBatches();
   private final long batchSizeLimit;
   private final long memSizeLimit;
   private final double tolerance = 0.95;
   private final long expireInMilliSecond;
+  private final LargeMessagePolicy largeMessagePolicy;
   private static final Logger LOG = LoggerFactory.getLogger(SequentialBasedBatchAccumulator.class);
 
   private final ReentrantLock dqLock = new ReentrantLock();
@@ -63,24 +65,31 @@ public abstract class SequentialBasedBatchAccumulator<D> extends BatchAccumulato
   }
 
   public SequentialBasedBatchAccumulator(Properties properties) {
-    Config config = ConfigUtils.propertiesToConfig(properties);
-    this.batchSizeLimit = ConfigUtils.getLong(config, Batch.BATCH_SIZE,
-            Batch.BATCH_SIZE_DEFAULT);
-
-    this.expireInMilliSecond = ConfigUtils.getLong(config, Batch.BATCH_TTL,
-            Batch.BATCH_TTL_DEFAULT);
-
-    this.capacity = ConfigUtils.getLong(config, Batch.BATCH_QUEUE_CAPACITY,
-            Batch.BATCH_QUEUE_CAPACITY_DEFAULT);
+    this(ConfigUtils.propertiesToConfig(properties));
+  }
 
-    this.memSizeLimit = (long) (this.tolerance * this.batchSizeLimit);
+  public SequentialBasedBatchAccumulator(Config config) {
+    this(ConfigUtils.getLong(config, Batch.BATCH_SIZE,
+            Batch.BATCH_SIZE_DEFAULT),
+        ConfigUtils.getLong(config, Batch.BATCH_TTL,
+            Batch.BATCH_TTL_DEFAULT),
+        ConfigUtils.getLong(config, Batch.BATCH_QUEUE_CAPACITY,
+            Batch.BATCH_QUEUE_CAPACITY_DEFAULT));
   }
 
   public SequentialBasedBatchAccumulator(long batchSizeLimit, long expireInMilliSecond, long capacity) {
+    this(batchSizeLimit, expireInMilliSecond, capacity, DEFAULT_LARGE_MESSAGE_POLICY);
+  }
+
+  public SequentialBasedBatchAccumulator(long batchSizeLimit,
+      long expireInMilliSecond,
+      long capacity,
+      LargeMessagePolicy largeMessagePolicy) {
     this.batchSizeLimit = batchSizeLimit;
     this.expireInMilliSecond = expireInMilliSecond;
     this.capacity = capacity;
     this.memSizeLimit = (long) (this.tolerance * this.batchSizeLimit);
+    this.largeMessagePolicy = largeMessagePolicy;
   }
 
   public long getNumOfBatches () {
@@ -101,7 +110,12 @@ public abstract class SequentialBasedBatchAccumulator<D> extends BatchAccumulato
     try {
       BytesBoundedBatch last = dq.peekLast();
       if (last != null) {
-        Future<RecordMetadata> future = last.tryAppend(record, callback);
+        Future<RecordMetadata> future = null;
+        try {
+          future = last.tryAppend(record, callback, this.largeMessagePolicy);
+        } catch (RecordTooLargeException e) {
+          // Ok if the record was too large for the current batch
+        }
         if (future != null) {
           return future;
         }
@@ -110,12 +124,18 @@ public abstract class SequentialBasedBatchAccumulator<D> extends BatchAccumulato
       // Create a new batch because previous one has no space
       BytesBoundedBatch batch = new BytesBoundedBatch(this.memSizeLimit, this.expireInMilliSecond);
       LOG.debug("Batch " + batch.getId() + " is generated");
-      Future<RecordMetadata> future = batch.tryAppend(record, callback);
+      Future<RecordMetadata> future = null;
+      try {
+        future = batch.tryAppend(record, callback, this.largeMessagePolicy);
+      } catch (RecordTooLargeException e) {
+        // If a new batch also wasn't able to accomodate the new message
+        throw new RuntimeException("Failed due to a message that was too large", e);
+      }
 
-      // Even single record can exceed the batch size limit
-      // Ignore the record because Eventhub can only accept payload less than 256KB
+      // The future might be null, since the largeMessagePolicy might be set to DROP
       if (future == null) {
-        LOG.error("Batch " + batch.getId() + " is marked as complete because it contains a huge record: "
+        assert largeMessagePolicy.equals(LargeMessagePolicy.DROP);
+        LOG.error("Batch " + batch.getId() + " is silently marked as complete, dropping a huge record: "
                 + record);
         future = Futures.immediateFuture(new RecordMetadata(0));
         callback.onSuccess(WriteResponse.EMPTY);
@@ -124,6 +144,7 @@ public abstract class SequentialBasedBatchAccumulator<D> extends BatchAccumulato
 
       // if queue is full, we should not add more
       while (dq.size() >= this.capacity) {
+        LOG.debug("Accumulator size {} is greater than capacity {}, waiting", dq.size(), this.capacity);
         this.notFull.await();
       }
       dq.addLast(batch);
@@ -187,7 +208,7 @@ public abstract class SequentialBasedBatchAccumulator<D> extends BatchAccumulato
         return dq.poll();
       } else {
           while (dq.size() == 0) {
-            LOG.info ("ready to sleep because of queue is empty");
+            LOG.debug ("ready to sleep because of queue is empty");
             SequentialBasedBatchAccumulator.this.notEmpty.await();
             if (SequentialBasedBatchAccumulator.this.isClosed()) {
               return dq.poll();
@@ -203,7 +224,7 @@ public abstract class SequentialBasedBatchAccumulator<D> extends BatchAccumulato
 
           if (dq.size() == 1) {
             if (dq.peekFirst().isTTLExpire()) {
-              LOG.info ("Batch " + dq.peekFirst().getId() + " is expired");
+              LOG.debug ("Batch " + dq.peekFirst().getId() + " is expired");
               BytesBoundedBatch candidate = dq.poll();
               SequentialBasedBatchAccumulator.this.notFull.signal();
               return candidate;
@@ -240,12 +261,16 @@ public abstract class SequentialBasedBatchAccumulator<D> extends BatchAccumulato
   public void flush() {
     try {
       ArrayList<Batch> batches = this.incomplete.all();
-      LOG.info ("flush on {} batches", batches.size());
+      int numOutstandingRecords = 0;
+      for (Batch batch: batches) {
+        numOutstandingRecords += batch.getRecords().size();
+      }
+      LOG.debug ("Flush called on {} batches with {} records total", batches.size(), numOutstandingRecords);
       for (Batch batch: batches) {
         batch.await();
       }
     } catch (Exception e) {
-      LOG.info ("Error happens when flushing");
+      LOG.error ("Error happened while flushing batches");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-distribution/gobblin-flavor-standard.gradle
----------------------------------------------------------------------
diff --git a/gobblin-distribution/gobblin-flavor-standard.gradle b/gobblin-distribution/gobblin-flavor-standard.gradle
index c2061a5..2f544ca 100644
--- a/gobblin-distribution/gobblin-flavor-standard.gradle
+++ b/gobblin-distribution/gobblin-flavor-standard.gradle
@@ -21,4 +21,5 @@ dependencies {
   compile project(':gobblin-modules:gobblin-crypto-provider')
   compile project(':gobblin-modules:gobblin-kafka-08')
   compile project(':gobblin-modules:google-ingestion')
+  compile project(':gobblin-modules:gobblin-elasticsearch') 
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-example/src/main/resources/wikipedia-elastic.conf
----------------------------------------------------------------------
diff --git a/gobblin-example/src/main/resources/wikipedia-elastic.conf b/gobblin-example/src/main/resources/wikipedia-elastic.conf
new file mode 100644
index 0000000..9db386e
--- /dev/null
+++ b/gobblin-example/src/main/resources/wikipedia-elastic.conf
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# A sample pull file that copies an input Kafka topic and produces to an output Kafka topic
+# with sampling
+job {
+  name=PullFromWikipediaToElasticSearch
+  group=Wikipedia
+  description=Pull from Wikipedia and write to ElasticSearch
+}
+
+task.maxretries=0
+
+source {
+  class=org.apache.gobblin.example.wikipedia.WikipediaSource
+  page.titles="Wikipedia:Sandbox"
+  revisions.cnt=5
+}
+
+wikipedia {
+   api.rooturl="https://en.wikipedia.org/w/api.php"
+  avro.schema="{\"namespace\": \"example.wikipedia.avro\",\"type\": \"record\",\"name\": \"WikipediaArticle\",\"fields\": [{\"name\": \"revid\", \"type\": [\"double\", \"null\"]},{\"name\": \"pageid\", \"type\": [\"double\", \"null\"]},{\"name\": \"title\", \"type\": [\"string\", \"null\"]},{\"name\": \"user\", \"type\": [\"string\", \"null\"]},{\"name\": \"anon\", \"type\": [\"string\", \"null\"]},{\"name\": \"userid\",  \"type\": [\"double\", \"null\"]},{\"name\": \"timestamp\", \"type\": [\"string\", \"null\"]},{\"name\": \"size\",  \"type\": [\"double\", \"null\"]},{\"name\": \"contentformat\",  \"type\": [\"string\", \"null\"]},{\"name\": \"contentmodel\",  \"type\": [\"string\", \"null\"]},{\"name\": \"content\", \"type\": [\"string\", \"null\"]}]}"
+} 
+converter.classes=org.apache.gobblin.example.wikipedia.WikipediaConverter
+extract.namespace=org.apache.gobblin.example.wikipedia
+
+writer {
+  builder.class=org.apache.gobblin.elasticsearch.writer.ElasticsearchDataWriterBuilder
+  elasticsearch {
+    client.type=REST
+    index.name=wikipedia-test
+    index.type=docs
+    #hosts=hostname
+    #ssl {
+    #  enabled=true
+    #  keystoreType=pkcs12
+    #  keystorePassword=change_me
+    #  keystoreLocation=/path/to/.p12 file
+    #  truststoreType=jks
+    #  truststoreLocation=/path/to/cacerts
+    #  truststorePassword=changeme
+    #}
+    typeMapperClass=org.apache.gobblin.elasticsearch.typemapping.AvroGenericRecordTypeMapper
+    useIdFromData=false  # change to true if you want to use a field from the record as the id field
+    #idFieldName=id      # change to the field of the record that you want to use as the id of the document
+  }
+}
+
+data.publisher.type=org.apache.gobblin.publisher.NoopPublisher
+

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch-deps/build.gradle
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch-deps/build.gradle b/gobblin-modules/gobblin-elasticsearch-deps/build.gradle
new file mode 100644
index 0000000..35e9a36
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch-deps/build.gradle
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+buildscript {
+    repositories {
+        jcenter()
+    }
+    dependencies {
+        classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4'
+    }
+}
+
+apply plugin: 'com.github.johnrengelman.shadow'
+apply plugin: 'java'
+
+dependencies {
+  compile "org.elasticsearch.client:transport:5.6.8"
+  compile "org.elasticsearch.client:elasticsearch-rest-high-level-client:5.6.8"
+  compile "com.google.guava:guava:18.0"
+}
+
+
+configurations {
+ compile {
+    exclude group: "org.apache.hadoop"
+    exclude group: "com.sun.jersey.contribs"
+  }
+}
+
+shadowJar {
+  zip64 true
+  relocate 'com.google.common', 'shadow.gobblin.elasticsearch.com.google.common'
+}
+
+ext.classification="library"

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/build.gradle
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/build.gradle b/gobblin-modules/gobblin-elasticsearch/build.gradle
new file mode 100644
index 0000000..2d624b2
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/build.gradle
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: 'java'
+
+dependencies {
+  compile project(":gobblin-api")
+  compile project(":gobblin-core-base")
+  compile project(":gobblin-utility")
+  compile project(":gobblin-metrics-libs:gobblin-metrics")
+  compile project(path: ":gobblin-modules:gobblin-elasticsearch-deps", configuration:"shadow")
+
+  compile "org.apache.logging.log4j:log4j-to-slf4j:2.7"
+  compile "org.slf4j:slf4j-api:1.7.21"
+  compile externalDependency.avro
+  compile externalDependency.jacksonCore
+  compile externalDependency.jacksonMapper
+  compile externalDependency.commonsHttpClient
+  compile externalDependency.commonsPool
+  compile externalDependency.commonsLang3
+  compile externalDependency.slf4j
+  compile externalDependency.httpclient
+  compile externalDependency.httpcore
+  compile externalDependency.lombok
+  compile externalDependency.metricsCore
+  compile externalDependency.typesafeConfig
+  compile externalDependency.findBugsAnnotations
+
+  testCompile project(":gobblin-runtime")
+  testCompile project(":gobblin-test-utils")
+  testCompile externalDependency.jsonAssert
+  testCompile externalDependency.mockito
+  testCompile externalDependency.testng
+}
+
+task installTestDependencies(type:Exec) {
+  workingDir "${project.rootDir}/gobblin-modules/gobblin-elasticsearch/"
+  commandLine './scripts/install_test_deps.sh'
+}
+
+task uninstallTestDependencies(type: Exec) {
+  workingDir "${project.rootDir}/gobblin-modules/gobblin-elasticsearch/"
+  commandLine './scripts/uninstall_test_deps.sh'
+  
+}
+
+test.dependsOn installTestDependencies
+test.finalizedBy uninstallTestDependencies
+
+configurations {
+  compile {
+    transitive = false 
+  }
+}
+
+test {
+  workingDir rootProject.rootDir
+  maxParallelForks = 4
+}
+
+
+ext.classification="library"

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/scripts/install_test_deps.sh
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/scripts/install_test_deps.sh b/gobblin-modules/gobblin-elasticsearch/scripts/install_test_deps.sh
new file mode 100755
index 0000000..48324da
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/scripts/install_test_deps.sh
@@ -0,0 +1,40 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+TARGET_DIR="test-elasticsearch"
+ES_VERSION=5.6.8
+ES_DIR=${TARGET_DIR}/elasticsearch-${ES_VERSION}
+echo ${TARGET_DIR}
+mkdir -p ${TARGET_DIR}
+
+
+ES_TAR=${TARGET_DIR}/elasticsearch-${ES_VERSION}.tar.gz
+ES_URL=https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-${ES_VERSION}.tar.gz
+echo ${ES_URL}
+echo ${ES_TAR}
+if [ -d $ES_DIR ];
+then
+  echo "Skipping download since version already found at ${ES_DIR}"
+  echo "Cleaning up directory"
+  rm -rf ${TARGET_DIR}/elasticsearch-${ES_VERSION}
+else
+  echo "$ES_DIR does not exist, downloading"
+  curl -o ${ES_TAR} ${ES_URL}
+fi
+tar -xzf ${ES_TAR} -C ${TARGET_DIR}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/scripts/uninstall_test_deps.sh
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/scripts/uninstall_test_deps.sh b/gobblin-modules/gobblin-elasticsearch/scripts/uninstall_test_deps.sh
new file mode 100755
index 0000000..db79f86
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/scripts/uninstall_test_deps.sh
@@ -0,0 +1,23 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+TARGET_DIR="test-elasticsearch"
+ES_VERSION=5.6.8
+ES_DIR=${TARGET_DIR}/elasticsearch-${ES_VERSION}
+rm -rf ${TARGET_DIR}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordSerializer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordSerializer.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordSerializer.java
new file mode 100644
index 0000000..5242202
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordSerializer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.typemapping;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.io.output.ByteArrayOutputStream;
+
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * A {@link JsonSerializer} for {@link GenericRecord} objects.
+ */
+@Slf4j
+public class AvroGenericRecordSerializer implements JsonSerializer<GenericRecord> {
+
+  private final ByteArrayOutputStream byteArrayOutputStream;
+  private final DataOutputStream out;
+  private final GenericDatumWriter<GenericRecord> writer;
+  private final Closer closer;
+
+
+  public AvroGenericRecordSerializer() {
+    this.closer =Closer.create();
+    this.byteArrayOutputStream = new ByteArrayOutputStream();
+    this.out = this.closer.register(new DataOutputStream(this.byteArrayOutputStream));
+    this.writer = new GenericDatumWriter<GenericRecord>();
+  }
+
+  @Override
+  public void configure(Config config) {
+
+  }
+
+  @Override
+  public synchronized byte[] serializeToJson(GenericRecord serializable)
+      throws SerializationException {
+    try {
+      /**
+       * We use the toString method of Avro to flatten the JSON for optional nullable types.
+       * Otherwise the JSON has an additional level of nesting to encode the type.
+       * e.g. "id": {"string": "id-value"} versus "id": "id-value"
+       * See {@link: https://issues.apache.org/jira/browse/AVRO-1582} for a good discussion on this.
+       */
+      String serialized = serializable.toString();
+      return serialized.getBytes(Charset.forName("UTF-8"));
+
+    } catch (Exception exception) {
+      throw new SerializationException("Could not serializeToJson Avro record", exception);
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    this.closer.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordTypeMapper.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordTypeMapper.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordTypeMapper.java
new file mode 100644
index 0000000..0586f3c
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordTypeMapper.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.typemapping;
+
+import java.io.IOException;
+
+import org.apache.avro.generic.GenericRecord;
+
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * A TypeMapper for Avro GenericRecords.
+ */
+@Slf4j
+public class AvroGenericRecordTypeMapper implements TypeMapper<GenericRecord> {
+
+  private final JsonSerializer<GenericRecord> serializer;
+  private final Closer closer;
+
+  public AvroGenericRecordTypeMapper() {
+    this.closer =Closer.create();
+    this.serializer = this.closer.register(new AvroGenericRecordSerializer());
+  }
+
+  @Override
+  public void configure(Config config) {
+    this.serializer.configure(config);
+    log.info("AvroGenericRecordTypeMapper successfully configured");
+  }
+
+  @Override
+  public JsonSerializer<GenericRecord> getSerializer() {
+    return this.serializer;
+  }
+
+  @Override
+  public String getValue(String fieldName, GenericRecord record)
+      throws FieldMappingException {
+    try {
+      Object idValue = record.get(fieldName);
+      return idValue.toString();
+    }
+    catch (Exception e) {
+      throw new FieldMappingException("Could not find field " + fieldName, e);
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    this.closer.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/FieldMappingException.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/FieldMappingException.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/FieldMappingException.java
new file mode 100644
index 0000000..781f918
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/FieldMappingException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.typemapping;
+
+/**
+ * An exception for type mapping errors during field-based access
+ */
+public class FieldMappingException extends Exception {
+
+  public FieldMappingException(Exception e) {
+    super(e);
+  }
+
+  public FieldMappingException(String message, Exception e) {
+    super(message, e);
+  }
+
+  public FieldMappingException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/GsonJsonSerializer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/GsonJsonSerializer.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/GsonJsonSerializer.java
new file mode 100644
index 0000000..d44986c
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/GsonJsonSerializer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.typemapping;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+import com.google.gson.Gson;
+import com.typesafe.config.Config;
+
+
+/**
+ * A Gson based Json Serializer
+ */
+public class GsonJsonSerializer implements JsonSerializer<Object> {
+  private final Gson _gson = new Gson();
+
+  @Override
+  public void configure(Config config) {
+
+  }
+
+  @Override
+  public byte[] serializeToJson(Object serializable)
+      throws SerializationException {
+    String jsonString = _gson.toJson(serializable);
+    try {
+      return jsonString.getBytes("UTF-8");
+    } catch (UnsupportedEncodingException e) {
+      throw new SerializationException(e);
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonSerializer.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonSerializer.java
new file mode 100644
index 0000000..41f2885
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonSerializer.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.typemapping;
+
+import java.io.Closeable;
+
+import com.typesafe.config.Config;
+
+
+public interface JsonSerializer<T> extends Closeable {
+
+  void configure(Config config);
+
+  byte[] serializeToJson(T serializable) throws SerializationException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonTypeMapper.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonTypeMapper.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonTypeMapper.java
new file mode 100644
index 0000000..8491147
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonTypeMapper.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.typemapping;
+
+import java.io.IOException;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.typesafe.config.Config;
+
+
+public class JsonTypeMapper implements TypeMapper<JsonElement> {
+
+  private final JsonSerializer serializer = new GsonJsonSerializer();
+  @Override
+  public void configure(Config config) {
+
+  }
+
+  @Override
+  public JsonSerializer<JsonElement> getSerializer() {
+    return serializer;
+  }
+
+  @Override
+  public String getValue(String fieldName, JsonElement record)
+      throws FieldMappingException {
+    assert record.isJsonObject();
+    JsonObject jsonObject = record.getAsJsonObject();
+    if (jsonObject.has(fieldName)) {
+      return jsonObject.get(fieldName).getAsString();
+    } else {
+      throw new FieldMappingException("Could not find field :" + fieldName);
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/SerializationException.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/SerializationException.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/SerializationException.java
new file mode 100644
index 0000000..d2edb53
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/SerializationException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.typemapping;
+
+/**
+ * A class to hold exceptions thrown by {@link JsonSerializer}s.
+ */
+public class SerializationException extends Exception {
+  public SerializationException(Exception e) {
+    super(e);
+  }
+
+  public SerializationException(String s, Exception exception) {
+    super(s, exception);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/TypeMapper.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/TypeMapper.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/TypeMapper.java
new file mode 100644
index 0000000..5aa909b
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/TypeMapper.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.typemapping;
+
+import java.io.Closeable;
+
+import com.typesafe.config.Config;
+
+
+/**
+ * An interface that enables the ElasticSearch writer to work with different types of records.
+ * Supports serialization and id-getter capabilities
+ */
+public interface TypeMapper<T> extends Closeable {
+
+  void configure(Config config);
+
+  JsonSerializer<T> getSerializer();
+
+  String getValue(String fieldName, T record) throws FieldMappingException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchDataWriterBuilder.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchDataWriterBuilder.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchDataWriterBuilder.java
new file mode 100644
index 0000000..cb6ed15
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchDataWriterBuilder.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.writer;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.writer.AsyncWriterManager;
+import org.apache.gobblin.writer.BatchAsyncDataWriter;
+import org.apache.gobblin.writer.BufferedAsyncDataWriter;
+import org.apache.gobblin.writer.DataWriter;
+import org.apache.gobblin.writer.DataWriterBuilder;
+import org.apache.gobblin.writer.SequentialBasedBatchAccumulator;
+
+import com.google.gson.JsonObject;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.configuration.State;
+
+public class ElasticsearchDataWriterBuilder extends DataWriterBuilder {
+
+  @Override
+  public DataWriter build() throws IOException {
+
+    State state = this.destination.getProperties();
+    Properties taskProps = state.getProperties();
+    Config config = ConfigUtils.propertiesToConfig(taskProps);
+
+    SequentialBasedBatchAccumulator<JsonObject> batchAccumulator = new SequentialBasedBatchAccumulator<>(taskProps);
+
+    BatchAsyncDataWriter asyncDataWriter;
+    switch (ElasticsearchWriterConfigurationKeys.ClientType.valueOf(
+        ConfigUtils.getString(config,
+            ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_CLIENT_TYPE,
+            ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_CLIENT_TYPE_DEFAULT).toUpperCase())) {
+      case REST: {
+        asyncDataWriter = new ElasticsearchRestWriter(config);
+        break;
+      }
+      case TRANSPORT: {
+        asyncDataWriter = new ElasticsearchTransportClientWriter(config);
+        break;
+      }
+      default: {
+        throw new IllegalArgumentException("Need to specify which "
+            + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_CLIENT_TYPE
+            + " client to use (rest/transport)");
+      }
+    }
+    BufferedAsyncDataWriter bufferedAsyncDataWriter = new BufferedAsyncDataWriter(batchAccumulator, asyncDataWriter);
+
+    double failureAllowance = ConfigUtils.getDouble(config, ElasticsearchWriterConfigurationKeys.FAILURE_ALLOWANCE_PCT_CONFIG,
+        ElasticsearchWriterConfigurationKeys.FAILURE_ALLOWANCE_PCT_DEFAULT) / 100.0;
+    boolean retriesEnabled = ConfigUtils.getBoolean(config, ElasticsearchWriterConfigurationKeys.RETRIES_ENABLED,
+        ElasticsearchWriterConfigurationKeys.RETRIES_ENABLED_DEFAULT);
+    int maxRetries = ConfigUtils.getInt(config, ElasticsearchWriterConfigurationKeys.MAX_RETRIES,
+        ElasticsearchWriterConfigurationKeys.MAX_RETRIES_DEFAULT);
+
+
+    return AsyncWriterManager.builder()
+        .failureAllowanceRatio(failureAllowance)
+        .retriesEnabled(retriesEnabled)
+        .numRetries(maxRetries)
+        .config(config)
+        .asyncDataWriter(bufferedAsyncDataWriter)
+        .build();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchRestWriter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchRestWriter.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchRestWriter.java
new file mode 100644
index 0000000..7cd77da
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchRestWriter.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.elasticsearch.writer;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.security.KeyStore;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.math3.util.Pair;
+import org.apache.gobblin.password.PasswordManager;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.writer.Batch;
+import org.apache.gobblin.writer.BatchAsyncDataWriter;
+import org.apache.gobblin.writer.GenericWriteResponse;
+import org.apache.gobblin.writer.WriteCallback;
+import org.apache.gobblin.writer.WriteResponse;
+import org.apache.http.HttpHost;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.impl.nio.reactor.IOReactorConfig;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.SSLContexts;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.typesafe.config.Config;
+
+import javax.annotation.Nullable;
+import javax.net.ssl.SSLContext;
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+public class ElasticsearchRestWriter extends ElasticsearchWriterBase implements BatchAsyncDataWriter<Object> {
+
+  private final RestHighLevelClient client;
+  private final RestClient lowLevelClient;
+
+  ElasticsearchRestWriter(Config config)
+      throws IOException {
+    super(config);
+
+
+    int threadCount = ConfigUtils.getInt(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_CLIENT_THREADPOOL_SIZE,
+        ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_CLIENT_THREADPOOL_DEFAULT);
+    try {
+
+      PasswordManager passwordManager = PasswordManager.getInstance();
+      Boolean sslEnabled = ConfigUtils.getBoolean(config,
+          ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_ENABLED,
+          ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_ENABLED_DEFAULT);
+      if (sslEnabled) {
+
+        // keystore
+        String keyStoreType = ConfigUtils
+            .getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_KEYSTORE_TYPE,
+                ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_KEYSTORE_TYPE_DEFAULT);
+        String keyStoreFilePassword = passwordManager.readPassword(ConfigUtils
+            .getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_KEYSTORE_PASSWORD, ""));
+        String identityFilepath = ConfigUtils
+            .getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_KEYSTORE_LOCATION, "");
+
+        // truststore
+        String trustStoreType = ConfigUtils
+            .getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_TYPE,
+                ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_TYPE_DEFAULT);
+        String trustStoreFilePassword = passwordManager.readPassword(ConfigUtils
+            .getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_PASSWORD, ""));
+        String cacertsFilepath = ConfigUtils
+            .getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_LOCATION, "");
+        String truststoreAbsolutePath = Paths.get(cacertsFilepath).toAbsolutePath().normalize().toString();
+        log.info("Truststore absolutePath is:" + truststoreAbsolutePath);
+
+
+        this.lowLevelClient =
+            buildRestClient(this.hostAddresses, threadCount, true, keyStoreType, keyStoreFilePassword, identityFilepath,
+                trustStoreType, trustStoreFilePassword, cacertsFilepath);
+      }
+      else {
+        this.lowLevelClient = buildRestClient(this.hostAddresses, threadCount);
+      }
+      client = new RestHighLevelClient(this.lowLevelClient);
+
+      log.info("Elasticsearch Rest Writer configured successfully with: indexName={}, "
+              + "indexType={}, idMappingEnabled={}, typeMapperClassName={}, ssl={}",
+          this.indexName, this.indexType, this.idMappingEnabled, this.typeMapper.getClass().getCanonicalName(),
+          sslEnabled);
+
+    } catch (Exception e) {
+      throw new IOException("Failed to instantiate rest elasticsearch client", e);
+    }
+  }
+
+  @Override
+  int getDefaultPort() {
+    return ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_REST_WRITER_DEFAULT_PORT;
+  }
+
+
+  private static RestClient buildRestClient(List<InetSocketTransportAddress> hosts, int threadCount)
+      throws Exception {
+    return buildRestClient(hosts, threadCount, false, null, null, null, null, null, null);
+  }
+
+
+  //TODO: Support pass through of configuration (e.g. timeouts etc) of rest client from above
+  private static RestClient buildRestClient(List<InetSocketTransportAddress> hosts, int threadCount, boolean sslEnabled,
+      String keyStoreType, String keyStoreFilePassword, String identityFilepath, String trustStoreType,
+      String trustStoreFilePassword, String cacertsFilepath) throws Exception {
+
+
+    HttpHost[] httpHosts = new HttpHost[hosts.size()];
+    String scheme = sslEnabled?"https":"http";
+    for (int h = 0; h < httpHosts.length; h++) {
+      InetSocketTransportAddress host = hosts.get(h);
+      httpHosts[h] = new HttpHost(host.getAddress(), host.getPort(), scheme);
+    }
+
+    RestClientBuilder builder = RestClient.builder(httpHosts);
+
+    if (sslEnabled) {
+      log.info("ssl configuration: trustStoreType = {}, cacertsFilePath = {}", trustStoreType, cacertsFilepath);
+      KeyStore truststore = KeyStore.getInstance(trustStoreType);
+      FileInputStream trustInputStream = new FileInputStream(cacertsFilepath);
+      try {
+        truststore.load(trustInputStream, trustStoreFilePassword.toCharArray());
+      }
+      finally {
+        trustInputStream.close();
+      }
+      SSLContextBuilder sslBuilder = SSLContexts.custom().loadTrustMaterial(truststore, null);
+
+      log.info("ssl key configuration: keyStoreType = {}, keyFilePath = {}", keyStoreType, identityFilepath);
+
+      KeyStore keystore = KeyStore.getInstance(keyStoreType);
+      FileInputStream keyInputStream = new FileInputStream(identityFilepath);
+      try {
+        keystore.load(keyInputStream, keyStoreFilePassword.toCharArray());
+      }
+      finally {
+        keyInputStream.close();
+      }
+      sslBuilder.loadKeyMaterial(keystore, keyStoreFilePassword.toCharArray());
+
+      final SSLContext sslContext = sslBuilder.build();
+      builder = builder.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder
+          // Set ssl context
+          .setSSLContext(sslContext).setSSLHostnameVerifier(new NoopHostnameVerifier())
+          // Configure number of threads for clients
+          .setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(threadCount).build()));
+    } else {
+      builder = builder.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder
+          // Configure number of threads for clients
+          .setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(threadCount).build()));
+    }
+
+    // Configure timeouts
+    builder.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder
+        .setConnectionRequestTimeout(0)); // Important, otherwise the client has spurious timeouts
+
+    return builder.build();
+  }
+
+  @Override
+  public Future<WriteResponse> write(final Batch<Object> batch, @Nullable WriteCallback callback) {
+
+    Pair<BulkRequest, FutureCallbackHolder> preparedBatch = this.prepareBatch(batch, callback);
+    try {
+      client.bulkAsync(preparedBatch.getFirst(), preparedBatch.getSecond().getActionListener());
+      return preparedBatch.getSecond().getFuture();
+    }
+    catch (Exception e) {
+      throw new RuntimeException("Caught unexpected exception while calling bulkAsync API", e);
+    }
+  }
+
+
+
+  @Override
+  public void flush() throws IOException {
+
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    this.lowLevelClient.close();
+  }
+
+  @VisibleForTesting
+  public RestHighLevelClient getRestHighLevelClient() {
+    return this.client;
+  }
+
+  @VisibleForTesting
+  public RestClient getRestLowLevelClient() {
+    return this.lowLevelClient;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchTransportClientWriter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchTransportClientWriter.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchTransportClientWriter.java
new file mode 100644
index 0000000..bb26fb5
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchTransportClientWriter.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.writer;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.math3.util.Pair;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.writer.Batch;
+import org.apache.gobblin.writer.BatchAsyncDataWriter;
+import org.apache.gobblin.writer.GenericWriteResponseWrapper;
+import org.apache.gobblin.writer.WriteCallback;
+import org.apache.gobblin.writer.WriteResponse;
+import org.apache.gobblin.writer.WriteResponseFuture;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ListenableActionFuture;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.transport.client.PreBuiltTransportClient;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+class ElasticsearchTransportClientWriter extends ElasticsearchWriterBase implements BatchAsyncDataWriter<Object> {
+
+  private final TransportClient client;
+
+  ElasticsearchTransportClientWriter(Config config) throws UnknownHostException {
+    super(config);
+    // Check if ssl is being configured, throw error that transport client does not support ssl
+    Preconditions.checkArgument(!ConfigUtils.getBoolean(config,
+        ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_ENABLED, false),
+        "Transport client does not support ssl, try the Rest client instead");
+
+    this.client = createTransportClient(config);
+
+    log.info("ElasticsearchWriter configured successfully with: indexName={}, indexType={}, idMappingEnabled={}, typeMapperClassName={}",
+        this.indexName, this.indexType, this.idMappingEnabled, this.typeMapper);
+  }
+
+  @Override
+  int getDefaultPort() {
+    return ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_TRANSPORT_WRITER_DEFAULT_PORT;
+  }
+
+  @Override
+  public Future<WriteResponse> write(Batch<Object> batch, @Nullable WriteCallback callback) {
+
+    Pair<BulkRequest, FutureCallbackHolder> preparedBatch = this.prepareBatch(batch, callback);
+    client.bulk(preparedBatch.getFirst(), preparedBatch.getSecond().getActionListener());
+    return preparedBatch.getSecond().getFuture();
+
+  }
+
+  @Override
+  public void flush() throws IOException {
+    // Elasticsearch client doesn't support a flush method
+  }
+
+  @Override
+  public void close() throws IOException {
+    log.info("Got a close call in ElasticSearchTransportWriter");
+    super.close();
+    this.client.close();
+  }
+
+  @VisibleForTesting
+  TransportClient getTransportClient() {
+    return this.client;
+  }
+
+  private TransportClient createTransportClient(Config config) throws UnknownHostException {
+    TransportClient transportClient;
+
+    // Set TransportClient settings
+    Settings.Builder settingsBuilder = Settings.builder();
+    if (config.hasPath(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SETTINGS)) {
+      settingsBuilder.put(ConfigUtils.configToProperties(config,
+              ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SETTINGS));
+    }
+    settingsBuilder.put("client.transport.ignore_cluster_name",true);
+    settingsBuilder.put("client.transport.sniff", true);
+    transportClient = new PreBuiltTransportClient(settingsBuilder.build());
+    this.hostAddresses.forEach(transportClient::addTransportAddress);
+    return transportClient;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterBase.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterBase.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterBase.java
new file mode 100644
index 0000000..5238b50
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterBase.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.writer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.commons.math3.util.Pair;
+import org.apache.gobblin.elasticsearch.typemapping.JsonSerializer;
+import org.apache.gobblin.elasticsearch.typemapping.TypeMapper;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.writer.Batch;
+import org.apache.gobblin.writer.WriteCallback;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Throwables;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * A base class for different types of Elasticsearch writers
+ */
+@Slf4j
+public abstract class ElasticsearchWriterBase implements Closeable {
+  protected final String indexName;
+  protected final String indexType;
+  protected final TypeMapper typeMapper;
+  protected final JsonSerializer serializer;
+  protected final boolean idMappingEnabled;
+  protected final String idFieldName;
+  List<InetSocketTransportAddress> hostAddresses;
+  protected final MalformedDocPolicy malformedDocPolicy;
+
+  ElasticsearchWriterBase(Config config)
+      throws UnknownHostException {
+
+    this.indexName = config.getString(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_NAME);
+    Preconditions.checkNotNull(this.indexName, "Index Name not provided. Please set "
+        + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_NAME);
+    Preconditions.checkArgument(this.indexName.equals(this.indexName.toLowerCase()),
+        "Index name must be lowercase, you provided " + this.indexName);
+    this.indexType = config.getString(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_TYPE);
+    Preconditions.checkNotNull(this.indexName, "Index Type not provided. Please set "
+        + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_TYPE);
+    this.idMappingEnabled = ConfigUtils.getBoolean(config,
+        ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_ID_MAPPING_ENABLED,
+        ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_ID_MAPPING_DEFAULT);
+    this.idFieldName = ConfigUtils.getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_ID_FIELD,
+        ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_ID_FIELD_DEFAULT);
+    String typeMapperClassName = ConfigUtils.getString(config,
+        ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS,
+        ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS_DEFAULT);
+    if (typeMapperClassName.isEmpty()) {
+      throw new IllegalArgumentException(this.getClass().getCanonicalName() + " needs to be configured with "
+          + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS + " to enable type mapping");
+    }
+    try {
+      Class<?> typeMapperClass = (Class<?>) Class.forName(typeMapperClassName);
+
+      this.typeMapper = (TypeMapper) ConstructorUtils.invokeConstructor(typeMapperClass);
+      this.typeMapper.configure(config);
+      this.serializer = this.typeMapper.getSerializer();
+    } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) {
+      log.error("Failed to instantiate type-mapper from class " + typeMapperClassName, e);
+      throw Throwables.propagate(e);
+    }
+
+    this.malformedDocPolicy = MalformedDocPolicy.valueOf(ConfigUtils.getString(config,
+        ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_MALFORMED_DOC_POLICY,
+        ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_MALFORMED_DOC_POLICY_DEFAULT));
+
+    // If list is empty, connect to the default host and port
+    if (!config.hasPath(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_HOSTS)) {
+      InetSocketTransportAddress hostAddress = new InetSocketTransportAddress(
+          InetAddress.getByName(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_DEFAULT_HOST),
+          getDefaultPort());
+      this.hostAddresses = new ArrayList<>(1);
+      this.hostAddresses.add(hostAddress);
+      log.info("Adding host {} to Elasticsearch writer", hostAddress);
+    } else {
+      // Get list of hosts
+      List<String> hosts = ConfigUtils.getStringList(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_HOSTS);
+      // Add host addresses
+      Splitter hostSplitter = Splitter.on(":").trimResults();
+      this.hostAddresses = new ArrayList<>(hosts.size());
+      for (String host : hosts) {
+
+        List<String> hostSplit = hostSplitter.splitToList(host);
+        Preconditions.checkArgument(hostSplit.size() == 1 || hostSplit.size() == 2,
+            "Malformed host name for Elasticsearch writer: " + host + " host names must be of form [host] or [host]:[port]");
+
+        InetAddress hostInetAddress = InetAddress.getByName(hostSplit.get(0));
+        InetSocketTransportAddress hostAddress = null;
+
+        if (hostSplit.size() == 1) {
+          hostAddress = new InetSocketTransportAddress(hostInetAddress, this.getDefaultPort());
+        } else if (hostSplit.size() == 2) {
+          hostAddress = new InetSocketTransportAddress(hostInetAddress, Integer.parseInt(hostSplit.get(1)));
+        }
+        this.hostAddresses.add(hostAddress);
+        log.info("Adding host {} to Elasticsearch writer", hostAddress);
+      }
+    }
+  }
+
+  abstract int getDefaultPort();
+
+
+  protected Pair<BulkRequest, FutureCallbackHolder> prepareBatch(Batch<Object> batch, WriteCallback callback) {
+    BulkRequest bulkRequest = new BulkRequest();
+    final StringBuilder stringBuilder = new StringBuilder();
+    for (Object record : batch.getRecords()) {
+      try {
+        byte[] serializedBytes = this.serializer.serializeToJson(record);
+        log.debug("serialized record: {}", serializedBytes);
+        IndexRequest indexRequest = new IndexRequest(this.indexName, this.indexType)
+            .source(serializedBytes, 0, serializedBytes.length, XContentType.JSON);
+        if (this.idMappingEnabled) {
+          String id = this.typeMapper.getValue(this.idFieldName, record);
+          indexRequest.id(id);
+          stringBuilder.append(";").append(id);
+        }
+        bulkRequest.add(indexRequest);
+      }
+      catch (Exception e) {
+        log.error("Encountered exception {}", e);
+      }
+    }
+    FutureCallbackHolder futureCallbackHolder = new FutureCallbackHolder(callback,
+        exception -> log.error("Batch: {} failed on ids; {} with exception {}", batch.getId(),
+            stringBuilder.toString(), exception),
+        this.malformedDocPolicy);
+    return new Pair(bulkRequest, futureCallbackHolder);
+  }
+
+  @Override
+  public void close() throws IOException {
+    this.serializer.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterConfigurationKeys.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterConfigurationKeys.java
new file mode 100644
index 0000000..0dad29d
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterConfigurationKeys.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.writer;
+
+import org.apache.gobblin.elasticsearch.typemapping.JsonTypeMapper;
+
+
+public class ElasticsearchWriterConfigurationKeys {
+
+  private static final String ELASTICSEARCH_WRITER_PREFIX = "writer.elasticsearch";
+
+  private static String prefix(String value) { return ELASTICSEARCH_WRITER_PREFIX + "." + value;};
+
+  public static final String ELASTICSEARCH_WRITER_SETTINGS = prefix("settings");
+  public static final String ELASTICSEARCH_WRITER_HOSTS = prefix("hosts");
+  public static final String ELASTICSEARCH_WRITER_INDEX_NAME = prefix("index.name");
+  public static final String ELASTICSEARCH_WRITER_INDEX_TYPE = prefix("index.type");
+  public static final String ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS = prefix("typeMapperClass");
+  public static final String ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS_DEFAULT = JsonTypeMapper.class.getCanonicalName();
+  public static final String ELASTICSEARCH_WRITER_ID_MAPPING_ENABLED = prefix("useIdFromData");
+  public static final Boolean ELASTICSEARCH_WRITER_ID_MAPPING_DEFAULT = false;
+  public static final String ELASTICSEARCH_WRITER_ID_FIELD = prefix("idFieldName");
+  public static final String ELASTICSEARCH_WRITER_ID_FIELD_DEFAULT = "id";
+  public static final String ELASTICSEARCH_WRITER_CLIENT_TYPE = prefix("client.type");
+  public static final String ELASTICSEARCH_WRITER_CLIENT_TYPE_DEFAULT = "REST";
+  public static final String ELASTICSEARCH_WRITER_CLIENT_THREADPOOL_SIZE = prefix("client.threadPoolSize");
+  public static final int ELASTICSEARCH_WRITER_CLIENT_THREADPOOL_DEFAULT = 5;
+  public static final String ELASTICSEARCH_WRITER_SSL_ENABLED=prefix("ssl.enabled");
+  public static final boolean ELASTICSEARCH_WRITER_SSL_ENABLED_DEFAULT=false;
+  public static final String ELASTICSEARCH_WRITER_SSL_KEYSTORE_TYPE=prefix("ssl.keystoreType");
+  public static final String ELASTICSEARCH_WRITER_SSL_KEYSTORE_TYPE_DEFAULT = "pkcs12";
+  public static final String ELASTICSEARCH_WRITER_SSL_KEYSTORE_PASSWORD=prefix("ssl.keystorePassword");
+  public static final String ELASTICSEARCH_WRITER_SSL_KEYSTORE_LOCATION=prefix("ssl.keystoreLocation");
+  public static final String ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_TYPE=prefix("ssl.truststoreType");
+  public static final String ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_TYPE_DEFAULT = "jks";
+  public static final String ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_LOCATION=prefix("ssl.truststoreLocation");
+  public static final String ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_PASSWORD=prefix("ssl.truststorePassword");
+  public static final String ELASTICSEARCH_WRITER_MALFORMED_DOC_POLICY = prefix("malformedDocPolicy");
+  public static final String ELASTICSEARCH_WRITER_MALFORMED_DOC_POLICY_DEFAULT = "FAIL";
+
+  //Async Writer Configuration
+  public static final String RETRIES_ENABLED = prefix("retriesEnabled");
+  public static final boolean RETRIES_ENABLED_DEFAULT = true;
+  public static final String MAX_RETRIES = prefix("maxRetries");
+  public static final int MAX_RETRIES_DEFAULT = 5;
+  static final String FAILURE_ALLOWANCE_PCT_CONFIG = prefix("failureAllowancePercentage");
+  static final double FAILURE_ALLOWANCE_PCT_DEFAULT = 0.0;
+
+  public enum ClientType {
+    TRANSPORT,
+    REST
+  }
+
+  public static final String ELASTICSEARCH_WRITER_DEFAULT_HOST = "localhost";
+  public static final int ELASTICSEARCH_TRANSPORT_WRITER_DEFAULT_PORT = 9300;
+  public static final int ELASTICSEARCH_REST_WRITER_DEFAULT_PORT = 9200;
+}