You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/09/01 05:31:43 UTC

[1/2] incubator-gobblin git commit: [GOBBLIN-17] Add Elasticsearch writer (rest + transport)

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master ef438c872 -> f1bc746ca


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ExceptionLogger.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ExceptionLogger.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ExceptionLogger.java
new file mode 100644
index 0000000..3a238ad
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ExceptionLogger.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.writer;
+
+/**
+ * An interface to log Exceptions
+ */
+public interface ExceptionLogger {
+
+  void log(Exception exception);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/FutureCallbackHolder.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/FutureCallbackHolder.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/FutureCallbackHolder.java
new file mode 100644
index 0000000..f592ffa
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/FutureCallbackHolder.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.writer;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.math3.util.Pair;
+import org.apache.gobblin.writer.GenericWriteResponse;
+import org.apache.gobblin.writer.WriteCallback;
+import org.apache.gobblin.writer.WriteResponse;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkResponse;
+
+import javax.annotation.Nullable;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * A class to hold Futures and Callbacks to support Async writes
+ */
+@Slf4j
+public class FutureCallbackHolder {
+
+  @Getter
+  private final ActionListener<BulkResponse> actionListener;
+  private final BlockingQueue<Pair<WriteResponse, Throwable>> writeResponseQueue = new ArrayBlockingQueue<>(1);
+  @Getter
+  private final Future<WriteResponse> future;
+  private final AtomicBoolean done = new AtomicBoolean(false);
+
+  public FutureCallbackHolder(final @Nullable WriteCallback callback,
+      ExceptionLogger exceptionLogger,
+      final MalformedDocPolicy malformedDocPolicy) {
+    this.future = new Future<WriteResponse>() {
+      @Override
+      public boolean cancel(boolean mayInterruptIfRunning) {
+        return false;
+      }
+
+      @Override
+      public boolean isCancelled() {
+        return false;
+      }
+
+      @Override
+      public boolean isDone() {
+        return done.get();
+      }
+
+      @Override
+      public WriteResponse get()
+          throws InterruptedException, ExecutionException {
+        Pair<WriteResponse, Throwable> writeResponseThrowablePair = writeResponseQueue.take();
+        return getWriteResponseorThrow(writeResponseThrowablePair);
+      }
+
+      @Override
+      public WriteResponse get(long timeout, TimeUnit unit)
+          throws InterruptedException, ExecutionException, TimeoutException {
+        Pair<WriteResponse, Throwable> writeResponseThrowablePair = writeResponseQueue.poll(timeout, unit);
+        if (writeResponseThrowablePair == null) {
+          throw new TimeoutException("Timeout exceeded while waiting for future to be done");
+        } else {
+          return getWriteResponseorThrow(writeResponseThrowablePair);
+        }
+      }
+    };
+
+    this.actionListener = new ActionListener<BulkResponse>() {
+      @Override
+      public void onResponse(BulkResponse bulkItemResponses) {
+        if (bulkItemResponses.hasFailures()) {
+          boolean logicalErrors = false;
+          boolean serverErrors = false;
+          for (BulkItemResponse bulkItemResponse: bulkItemResponses) {
+            if (bulkItemResponse.isFailed()) {
+              // check if the failure is permanent (logical) or transient (server)
+              if (isLogicalError(bulkItemResponse)) {
+                // check error policy
+                switch (malformedDocPolicy) {
+                  case IGNORE: {
+                    log.debug("Document id {} was malformed with error {}",
+                        bulkItemResponse.getId(),
+                        bulkItemResponse.getFailureMessage());
+                    break;
+                  }
+                  case WARN: {
+                    log.warn("Document id {} was malformed with error {}",
+                        bulkItemResponse.getId(),
+                        bulkItemResponse.getFailureMessage());
+                    break;
+                  }
+                  default: {
+                    // Pass through
+                  }
+                }
+                logicalErrors = true;
+              } else {
+                serverErrors = true;
+              }
+            }
+          }
+          if (serverErrors) {
+            onFailure(new RuntimeException("Partial failures in the batch: " + bulkItemResponses.buildFailureMessage()));
+          } else if (logicalErrors) {
+            // all errors found were logical, throw RuntimeException if policy says to Fail
+            switch (malformedDocPolicy) {
+              case FAIL: {
+                onFailure(new RuntimeException("Partial non-recoverable failures in the batch. To ignore these, set "
+                    + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_MALFORMED_DOC_POLICY + " to "
+                    + MalformedDocPolicy.IGNORE.name()));
+                break;
+              }
+              default: {
+                WriteResponse writeResponse = new GenericWriteResponse<BulkResponse>(bulkItemResponses);
+                writeResponseQueue.add(new Pair<WriteResponse, Throwable>(writeResponse, null));
+                if (callback != null) {
+                  callback.onSuccess(writeResponse);
+                }
+              }
+            }
+          }
+        } else {
+          WriteResponse writeResponse = new GenericWriteResponse<BulkResponse>(bulkItemResponses);
+          writeResponseQueue.add(new Pair<WriteResponse, Throwable>(writeResponse, null));
+          if (callback != null) {
+            callback.onSuccess(writeResponse);
+          }
+        }
+      }
+
+      private boolean isLogicalError(BulkItemResponse bulkItemResponse) {
+        String failureMessage = bulkItemResponse.getFailureMessage();
+        return failureMessage.contains("IllegalArgumentException")
+            || failureMessage.contains("illegal_argument_exception")
+            || failureMessage.contains("MapperParsingException")
+            || failureMessage.contains("mapper_parsing_exception");
+      }
+
+      @Override
+      public void onFailure(Exception exception) {
+        writeResponseQueue.add(new Pair<WriteResponse, Throwable>(null, exception));
+        if (exceptionLogger != null) {
+          exceptionLogger.log(exception);
+        }
+        if (callback != null) {
+          callback.onFailure(exception);
+        }
+      }
+    };
+  }
+
+
+  private WriteResponse getWriteResponseorThrow(Pair<WriteResponse, Throwable> writeResponseThrowablePair)
+      throws ExecutionException {
+    try {
+      if (writeResponseThrowablePair.getFirst() != null) {
+        return writeResponseThrowablePair.getFirst();
+      } else if (writeResponseThrowablePair.getSecond() != null) {
+        throw new ExecutionException(writeResponseThrowablePair.getSecond());
+      } else {
+        throw new ExecutionException(new RuntimeException("Could not find non-null WriteResponse pair"));
+      }
+    } finally {
+      done.set(true);
+    }
+
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/MalformedDocPolicy.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/MalformedDocPolicy.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/MalformedDocPolicy.java
new file mode 100644
index 0000000..4449d60
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/MalformedDocPolicy.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.writer;
+
+/**
+ * A class to represent different policies for handling malformed documents
+ */
+public enum MalformedDocPolicy {
+  IGNORE,  // Ignore on failure
+  WARN,    // Log warning on failure
+  FAIL     // Fail on failure
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/ElasticsearchTestServer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/ElasticsearchTestServer.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/ElasticsearchTestServer.java
new file mode 100644
index 0000000..fb11d8d
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/ElasticsearchTestServer.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.gobblin.test.TestUtils;
+import org.testng.Assert;
+
+import com.google.common.base.Throwables;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * A Test ElasticSearch server
+ */
+@Slf4j
+@NotThreadSafe
+public class ElasticsearchTestServer {
+
+
+  private static final String ELASTICSEARCH_VERSION="5.6.8";
+  private static final String TEST_ROOT_DIR="gobblin-modules/gobblin-elasticsearch/test-elasticsearch/";
+  // The clean elasticsearch instance is installed here
+  private static final String BASE_ELASTICSEARCH_INSTALL =TEST_ROOT_DIR + "elasticsearch-" + ELASTICSEARCH_VERSION;
+  // Per-test elasticsearch instances are installed under a different directory
+  private static final String TEST_INSTALL_PREFIX =TEST_ROOT_DIR + "es-test-install-";
+  private static final String ELASTICSEARCH_BIN="/bin/elasticsearch";
+  private static final String ELASTICSEARCH_CONFIG_FILE= "/config/elasticsearch.yml";
+  private static final String ELASTICSEARCH_JVMOPTS_FILE="/config/jvm.options";
+  private final String _testId;
+  private final int _tcpPort;
+  private Process elasticProcess;
+  private final int _httpPort;
+  private String _pid = ManagementFactory.getRuntimeMXBean().getName();
+  private final String _testInstallDirectory;
+  private AtomicBoolean _started = new AtomicBoolean(false);
+
+  public ElasticsearchTestServer(String testId)
+      throws IOException {
+    this(testId, TestUtils.findFreePort(), TestUtils.findFreePort());
+  }
+
+  private ElasticsearchTestServer(String testId, int httpPort, int tcpPort)
+      throws IOException {
+    _testId = testId;
+    _httpPort = httpPort;
+    _tcpPort = tcpPort;
+    _testInstallDirectory = TEST_INSTALL_PREFIX + _testId;
+    try {
+      createInstallation();
+    }
+    catch (Exception e) {
+      throw new IOException("Failed to create a test installation of elasticsearch", e);
+    }
+    configure();
+  }
+
+  public ElasticsearchTestServer()
+      throws IOException {
+    this(TestUtils.generateRandomAlphaString(25));
+  }
+
+  private void createInstallation()
+      throws IOException {
+    File srcDir = new File(BASE_ELASTICSEARCH_INSTALL);
+    if (!srcDir.exists()) {
+      throw new IOException("Could not find base elasticsearch instance installed at " + srcDir.getAbsolutePath() + "\n"
+          + "Run ./gradlew :gobblin-modules:gobblin-elasticsearch:installTestDependencies before running this test");
+    }
+    File destDir = new File(_testInstallDirectory);
+    log.debug("About to recreate directory : {}", destDir.getPath());
+    if (destDir.exists()) {
+      org.apache.commons.io.FileUtils.deleteDirectory(destDir);
+    }
+
+    String[] commands = {"cp", "-r", srcDir.getAbsolutePath(), destDir.getAbsolutePath()};
+    try {
+      log.debug("{}: Will run command: {}", this._pid, Arrays.toString(commands));
+      Process copyProcess = new ProcessBuilder().inheritIO().command(commands).start();
+      copyProcess.waitFor();
+    } catch (Exception e) {
+      log.error("Failed to create installation directory at {}", destDir.getPath(), e);
+      Throwables.propagate(e);
+    }
+  }
+
+
+
+
+  private void configure() throws IOException {
+    File configFile = new File(_testInstallDirectory + ELASTICSEARCH_CONFIG_FILE);
+    FileOutputStream configFileStream = new FileOutputStream(configFile);
+    try {
+      configFileStream.write(("cluster.name: " + _testId + "\n").getBytes("UTF-8"));
+      configFileStream.write(("http.port: " + _httpPort + "\n").getBytes("UTF-8"));
+      configFileStream.write(("transport.tcp.port: " + _tcpPort + "\n").getBytes("UTF-8"));
+    }
+    finally {
+      configFileStream.close();
+    }
+
+    File jvmConfigFile = new File(_testInstallDirectory + ELASTICSEARCH_JVMOPTS_FILE);
+    try (Stream<String> lines = Files.lines(jvmConfigFile.toPath())) {
+      List<String> newLines = lines.map(line -> line.replaceAll("^\\s*(-Xm[s,x]).*$", "$1128m"))
+      .collect(Collectors.toList());
+      Files.write(jvmConfigFile.toPath(), newLines);
+    }
+  }
+
+  public void start(int maxStartupTimeSeconds)
+  {
+    if (_started.get()) {
+      log.warn("ElasticSearch server has already been attempted to be started... returning without doing anything");
+      return;
+    }
+    _started.set(true);
+
+    log.error("{}: Starting elasticsearch server on port {}", this._pid, this._httpPort);
+    String[] commands = {_testInstallDirectory + ELASTICSEARCH_BIN};
+
+    try {
+      log.error("{}: Will run command: {}", this._pid, Arrays.toString(commands));
+      elasticProcess = new ProcessBuilder().inheritIO().command(commands).start();
+      if (elasticProcess != null) {
+        // register destroy of process on shutdown in-case of unclean test termination
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+          public void run() {
+            if (elasticProcess!=null) {
+              elasticProcess.destroy();
+            }
+          }
+        });
+      }
+    } catch (Exception e) {
+      log.error("Failed to start elasticsearch server", e);
+      Throwables.propagate(e);
+    }
+
+    boolean isUp = false;
+    int numTries = maxStartupTimeSeconds * 2;
+    while (!isUp && numTries-- > 0) {
+      try {
+        Thread.sleep(500); // wait 1/2 second
+        isUp = isUp();
+      } catch (Exception e) {
+
+      }
+    }
+    Assert.assertTrue(isUp, "Server is not up!");
+  }
+
+
+  public boolean isUp()
+  {
+    try {
+      URL url = new URL("http://localhost:" + _httpPort + "/_cluster/health?wait_for_status=green");
+      long startTime = System.nanoTime();
+      HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
+      int responseCode = httpURLConnection.getResponseCode();
+      log.info("Duration: {} seconds, Response code = {}",
+          (System.nanoTime() - startTime) / 1000000000.0,
+          responseCode);
+      if (responseCode == 200) { return true; } else {return false;}
+    }
+    catch (Exception e) {
+      Throwables.propagate(e);
+      return false;
+    }
+  }
+
+  public int getTransportPort() {
+    return _tcpPort;
+  }
+
+
+  public int getHttpPort() { return _httpPort; }
+
+
+  public void stop() {
+    if (elasticProcess != null) {
+      try {
+        elasticProcess.destroy();
+        elasticProcess = null; // set to null to prevent redundant call to destroy on shutdown
+      } catch (Exception e) {
+        log.warn("Failed to stop the ElasticSearch server", e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/ElasticsearchTestServerTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/ElasticsearchTestServerTest.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/ElasticsearchTestServerTest.java
new file mode 100644
index 0000000..dc3294d
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/ElasticsearchTestServerTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch;
+
+import java.io.IOException;
+
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+
+
+/**
+ * A Test to test that the {@link ElasticsearchTestServer} class does what it is supposed to do
+ */
+public class ElasticsearchTestServerTest {
+
+
+  ElasticsearchTestServer _elasticsearchTestServer;
+
+  @BeforeSuite
+  public void startServer()
+      throws IOException {
+    _elasticsearchTestServer = new ElasticsearchTestServer();
+    _elasticsearchTestServer.start(60);
+  }
+  @Test
+  public void testServerStart()
+      throws InterruptedException, IOException {
+      _elasticsearchTestServer.start(60); // second start should be a no-op
+  }
+
+  @AfterSuite
+  public void stopServer() {
+    _elasticsearchTestServer.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ConfigBuilder.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ConfigBuilder.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ConfigBuilder.java
new file mode 100644
index 0000000..f12528d
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ConfigBuilder.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.writer;
+
+import java.util.Properties;
+
+import org.apache.gobblin.elasticsearch.typemapping.AvroGenericRecordTypeMapper;
+import org.apache.gobblin.elasticsearch.typemapping.JsonTypeMapper;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Setter;
+import lombok.experimental.Accessors;
+
+
+/**
+ * A helper class to build Config for Elasticsearch Writers
+ */
+@Accessors(chain=true)
+public class ConfigBuilder {
+  @Setter
+  String indexName;
+  @Setter
+  String indexType;
+  @Setter
+  int httpPort;
+  @Setter
+  int transportPort;
+  @Setter
+  boolean idMappingEnabled = true;
+  @Setter
+  String clientType = "REST";
+  @Setter
+  String typeMapperClassName;
+  @Setter
+  MalformedDocPolicy malformedDocPolicy;
+
+  Config build() {
+    Properties props = new Properties();
+    props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_CLIENT_TYPE, clientType);
+    props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_NAME, indexName);
+    props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_TYPE, indexType);
+    props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_ID_MAPPING_ENABLED,
+        "" + idMappingEnabled);
+    if (this.clientType.equalsIgnoreCase("rest")) {
+      props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_HOSTS, "localhost:" + httpPort);
+    } else if (this.clientType.equalsIgnoreCase("transport")) {
+      props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_HOSTS, "localhost:" + transportPort);
+    } else throw new RuntimeException("Client type needs to be one of rest/transport");
+    props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS, typeMapperClassName);
+    if (malformedDocPolicy != null) {
+      props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_MALFORMED_DOC_POLICY,
+          malformedDocPolicy.toString().toUpperCase());
+    }
+    return ConfigFactory.parseProperties(props);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchTransportClientWriterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchTransportClientWriterTest.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchTransportClientWriterTest.java
new file mode 100644
index 0000000..46ae680
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchTransportClientWriterTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.writer;
+
+import java.net.UnknownHostException;
+import java.util.Properties;
+
+import org.apache.gobblin.elasticsearch.typemapping.AvroGenericRecordTypeMapper;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+public class ElasticsearchTransportClientWriterTest {
+
+  @Test
+  public void testBadSslConfiguration()
+      throws UnknownHostException {
+    Properties props = new Properties();
+    props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_NAME, "test");
+    props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_TYPE, "test");
+    props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS,
+        AvroGenericRecordTypeMapper.class.getCanonicalName());
+    props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_ID_MAPPING_ENABLED, "true");
+    props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_ENABLED, "true");
+    Config config = ConfigFactory.parseProperties(props);
+    try {
+      new ElasticsearchTransportClientWriter(config);
+      Assert.fail("Writer should not be constructed");
+    }
+    catch (Exception e) {
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterBaseTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterBaseTest.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterBaseTest.java
new file mode 100644
index 0000000..341cd76
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterBaseTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.writer;
+
+import java.net.UnknownHostException;
+import java.util.Properties;
+
+import org.apache.gobblin.elasticsearch.typemapping.AvroGenericRecordTypeMapper;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+public class ElasticsearchWriterBaseTest {
+
+  public static ElasticsearchWriterBase getWriterBase(Config config)
+      throws UnknownHostException {
+    return new ElasticsearchWriterBase(config) {
+      @Override
+      int getDefaultPort() {
+        return 0;
+      }
+    };
+  }
+
+  private void assertFailsToConstruct(Properties props, String testScenario) {
+    assertConstructionExpectation(props, testScenario, false);
+  }
+
+  private void assertSucceedsToConstruct(Properties props, String testScenario) {
+    assertConstructionExpectation(props, testScenario, true);
+  }
+
+  private void assertConstructionExpectation(Properties props,
+      String testScenario,
+      Boolean constructionSuccess) {
+    Config config = ConfigFactory.parseProperties(props);
+    try {
+      ElasticsearchWriterBase writer = getWriterBase(config);
+      if (!constructionSuccess) {
+        Assert.fail("Test Scenario: " + testScenario + ": Writer should not be constructed");
+      }
+    }
+    catch (Exception e) {
+      if (constructionSuccess) {
+        Assert.fail("Test Scenario: " + testScenario + ": Writer should be constructed successfully");
+      }
+    }
+  }
+
+  @Test
+  public void testMinimalRequiredConfiguration()
+      throws UnknownHostException {
+    Properties props = new Properties();
+    props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_NAME, "test");
+    props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_TYPE, "test");
+    props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS,
+        AvroGenericRecordTypeMapper.class.getCanonicalName());
+    assertSucceedsToConstruct(props, "minimal configuration");
+  }
+
+  @Test
+  public void testBadIndexNameConfiguration()
+      throws UnknownHostException {
+    Properties props = new Properties();
+    props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_TYPE, "test");
+    props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS,
+        AvroGenericRecordTypeMapper.class.getCanonicalName());
+    assertFailsToConstruct(props, "index name missing");
+  }
+
+
+
+  @Test
+  public void testBadIndexNameCasingConfiguration()
+      throws UnknownHostException {
+    Properties props = new Properties();
+    props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_NAME, "Test");
+    props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS,
+        AvroGenericRecordTypeMapper.class.getCanonicalName());
+    assertFailsToConstruct(props, "bad index name casing");
+  }
+
+  @Test
+  public void testBadIndexTypeConfiguration()
+      throws UnknownHostException {
+    Properties props = new Properties();
+    props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_NAME, "test");
+    props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS,
+        AvroGenericRecordTypeMapper.class.getCanonicalName());
+    assertFailsToConstruct(props, "no index type provided");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterIntegrationTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterIntegrationTest.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterIntegrationTest.java
new file mode 100644
index 0000000..746171c
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterIntegrationTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.writer;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.math3.util.Pair;
+import org.apache.gobblin.elasticsearch.ElasticsearchTestServer;
+import org.apache.gobblin.test.AvroRecordGenerator;
+import org.apache.gobblin.test.JsonRecordGenerator;
+import org.apache.gobblin.test.PayloadType;
+import org.apache.gobblin.test.RecordTypeGenerator;
+import org.apache.gobblin.test.TestUtils;
+import org.apache.gobblin.writer.AsyncWriterManager;
+import org.apache.gobblin.writer.BatchAsyncDataWriter;
+import org.apache.gobblin.writer.BufferedAsyncDataWriter;
+import org.apache.gobblin.writer.DataWriter;
+import org.apache.gobblin.writer.SequentialBasedBatchAccumulator;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.GetResponse;
+import org.testng.Assert;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+public class ElasticsearchWriterIntegrationTest {
+
+
+  private ElasticsearchTestServer _esTestServer;
+  private String pid = ManagementFactory.getRuntimeMXBean().getName();
+
+  private List<WriterVariant> variants;
+  private List<RecordTypeGenerator> recordGenerators;
+
+  ElasticsearchWriterIntegrationTest() {
+    variants = ImmutableList.of(new RestWriterVariant(),
+        new TransportWriterVariant());
+    recordGenerators = ImmutableList.of(new AvroRecordGenerator(), new JsonRecordGenerator());
+  }
+
+  @BeforeSuite
+  public void startServers()
+      throws IOException {
+    log.error("{}: Starting Elasticsearch Server", pid);
+    _esTestServer = new ElasticsearchTestServer();
+    _esTestServer.start(60);
+  }
+
+  @AfterSuite
+  public void stopServers() {
+    log.error("{}: Stopping Elasticsearch Server", pid);
+    _esTestServer.stop();
+  }
+
+
+  @Test
+  public void testSingleRecordWrite()
+      throws IOException {
+
+    for (WriterVariant writerVariant : variants) {
+      for (RecordTypeGenerator recordVariant : recordGenerators) {
+
+        String indexName = "posts" + writerVariant.getName().toLowerCase();
+        String indexType = recordVariant.getName();
+        Config config = writerVariant.getConfigBuilder()
+            .setIndexName(indexName)
+            .setIndexType(indexType)
+            .setTypeMapperClassName(recordVariant.getTypeMapperClassName())
+            .setHttpPort(_esTestServer.getHttpPort())
+            .setTransportPort(_esTestServer.getTransportPort())
+            .build();
+
+        TestClient testClient = writerVariant.getTestClient(config);
+        SequentialBasedBatchAccumulator<Object> batchAccumulator = new SequentialBasedBatchAccumulator<>(config);
+        BufferedAsyncDataWriter bufferedAsyncDataWriter = new BufferedAsyncDataWriter(batchAccumulator, writerVariant.getBatchAsyncDataWriter(config));
+
+
+        String id = TestUtils.generateRandomAlphaString(10);
+        Object testRecord = recordVariant.getRecord(id, PayloadType.STRING);
+
+        DataWriter writer = AsyncWriterManager.builder().failureAllowanceRatio(0.0).retriesEnabled(false).config(config)
+            .asyncDataWriter(bufferedAsyncDataWriter).build();
+
+        try {
+          testClient.recreateIndex(indexName);
+          writer.write(testRecord);
+          writer.commit();
+        } finally {
+          writer.close();
+        }
+
+        try {
+          GetResponse response = testClient.get(new GetRequest(indexName, indexType, id));
+          Assert.assertEquals(response.getId(), id, "Response id matches request");
+          Assert.assertEquals(response.isExists(), true, "Document not found");
+        } catch (Exception e) {
+          Assert.fail("Failed to get a response", e);
+        } finally {
+          testClient.close();
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testMalformedDocCombinations()
+      throws IOException {
+    for (WriterVariant writerVariant : variants) {
+      for (RecordTypeGenerator recordVariant : recordGenerators) {
+        for (MalformedDocPolicy policy : MalformedDocPolicy.values()) {
+          testMalformedDocs(writerVariant, recordVariant, policy);
+        }
+      }
+    }
+  }
+
+
+
+  /**
+   * Sends two docs in a single batch with different field types
+   * Triggers Elasticsearch server to send back an exception due to malformed docs
+   * @throws IOException
+   */
+  public void testMalformedDocs(WriterVariant writerVariant, RecordTypeGenerator recordVariant, MalformedDocPolicy malformedDocPolicy)
+      throws IOException {
+
+    String indexName = writerVariant.getName().toLowerCase();
+    String indexType = (recordVariant.getName()+malformedDocPolicy.name()).toLowerCase();
+    Config config = writerVariant.getConfigBuilder()
+        .setIdMappingEnabled(true)
+        .setIndexName(indexName)
+        .setIndexType(indexType)
+        .setHttpPort(_esTestServer.getHttpPort())
+        .setTransportPort(_esTestServer.getTransportPort())
+        .setTypeMapperClassName(recordVariant.getTypeMapperClassName())
+        .setMalformedDocPolicy(malformedDocPolicy)
+        .build();
+
+
+    TestClient testClient = writerVariant.getTestClient(config);
+    testClient.recreateIndex(indexName);
+
+    String id1=TestUtils.generateRandomAlphaString(10);
+    String id2=TestUtils.generateRandomAlphaString(10);
+
+    Object testRecord1 = recordVariant.getRecord(id1, PayloadType.LONG);
+    Object testRecord2 = recordVariant.getRecord(id2, PayloadType.MAP);
+
+    SequentialBasedBatchAccumulator<Object> batchAccumulator = new SequentialBasedBatchAccumulator<>(config);
+    BatchAsyncDataWriter elasticsearchWriter = writerVariant.getBatchAsyncDataWriter(config);
+    BufferedAsyncDataWriter bufferedAsyncDataWriter = new BufferedAsyncDataWriter(batchAccumulator, elasticsearchWriter);
+
+
+    DataWriter writer = AsyncWriterManager.builder()
+        .failureAllowanceRatio(0.0)
+        .retriesEnabled(false)
+        .config(config)
+        .asyncDataWriter(bufferedAsyncDataWriter)
+        .build();
+
+    try {
+      writer.write(testRecord1);
+      writer.write(testRecord2);
+      writer.commit();
+      writer.close();
+      if (malformedDocPolicy == MalformedDocPolicy.FAIL) {
+        Assert.fail("Should have thrown an exception if malformed doc policy was set to Fail");
+      }
+    }
+    catch (Exception e) {
+      switch (malformedDocPolicy) {
+        case IGNORE:case WARN:{
+          Assert.fail("Should not have failed if malformed doc policy was set to ignore or warn", e);
+          break;
+        }
+        case FAIL: {
+          // pass through
+          break;
+        }
+        default: {
+          throw new RuntimeException("This test does not handle this policyType : " + malformedDocPolicy.toString());
+        }
+      }
+    }
+
+    // Irrespective of policy, first doc should be inserted and second doc should fail
+    int docsIndexed = 0;
+    try {
+      {
+        GetResponse response = testClient.get(new GetRequest(indexName, indexType, id1));
+        Assert.assertEquals(response.getId(), id1, "Response id matches request");
+        System.out.println(malformedDocPolicy + ":" + response.toString());
+        if (response.isExists()) {
+          docsIndexed++;
+        }
+      }
+      {
+        GetResponse response = testClient.get(new GetRequest(indexName, indexType, id2));
+        Assert.assertEquals(response.getId(), id2, "Response id matches request");
+        System.out.println(malformedDocPolicy + ":" + response.toString());
+        if (response.isExists()) {
+          docsIndexed++;
+        }
+      }
+      // only one doc should be found
+      Assert.assertEquals(docsIndexed, 1, "Only one document should be indexed");
+    }
+    catch (Exception e) {
+      Assert.fail("Failed to get a response", e);
+    }
+    finally {
+      testClient.close();
+    }
+  }
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/RestWriterVariant.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/RestWriterVariant.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/RestWriterVariant.java
new file mode 100644
index 0000000..94d8e56
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/RestWriterVariant.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.writer;
+
+import java.io.IOException;
+import java.util.Collections;
+
+import org.apache.gobblin.writer.BatchAsyncDataWriter;
+import org.apache.http.HttpEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.testng.Assert;
+
+import com.typesafe.config.Config;
+
+
+/**
+ * A variant that uses the {@link ElasticsearchRestWriter}
+ */
+public class RestWriterVariant implements WriterVariant {
+
+  private ElasticsearchRestWriter _restWriter;
+  @Override
+  public String getName() {
+    return "rest";
+  }
+
+  @Override
+  public ConfigBuilder getConfigBuilder() {
+    return new ConfigBuilder()
+        .setClientType("REST");
+  }
+
+  @Override
+  public BatchAsyncDataWriter getBatchAsyncDataWriter(Config config)
+      throws IOException {
+    _restWriter = new ElasticsearchRestWriter(config);
+    return _restWriter;
+  }
+
+  @Override
+  public TestClient getTestClient(Config config)
+      throws IOException {
+    final ElasticsearchRestWriter restWriter = new ElasticsearchRestWriter(config);
+    final RestHighLevelClient highLevelClient = restWriter.getRestHighLevelClient();
+    return new TestClient() {
+      @Override
+      public GetResponse get(GetRequest getRequest)
+          throws IOException {
+        return highLevelClient.get(getRequest);
+      }
+
+      @Override
+      public void recreateIndex(String indexName)
+          throws IOException {
+        RestClient restClient = restWriter.getRestLowLevelClient();
+        try {
+          restClient.performRequest("DELETE", "/" + indexName);
+        } catch (Exception e) {
+          // ok since index may not exist
+        }
+
+        String indexSettings = "{\"settings\" : {\"index\":{\"number_of_shards\":1,\"number_of_replicas\":1}}}";
+        HttpEntity entity = new StringEntity(indexSettings, ContentType.APPLICATION_JSON);
+
+        Response putResponse = restClient.performRequest("PUT", "/" + indexName, Collections.emptyMap(), entity);
+        Assert.assertEquals(putResponse.getStatusLine().getStatusCode(),200, "Recreate index succeeded");
+      }
+
+      @Override
+      public void close()
+          throws IOException {
+        restWriter.close();
+
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/TestClient.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/TestClient.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/TestClient.java
new file mode 100644
index 0000000..31f08b1
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/TestClient.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.writer;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.GetResponse;
+
+
+/**
+ * An interface to describe a functional Elasticsearch client to aid in verification
+ * of test results
+ */
+
+public interface TestClient extends Closeable {
+  GetResponse get(GetRequest getRequest)
+      throws IOException;
+
+  void recreateIndex(String indexName)
+      throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/TransportWriterVariant.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/TransportWriterVariant.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/TransportWriterVariant.java
new file mode 100644
index 0000000..eb28c9a
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/TransportWriterVariant.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.writer;
+
+import java.io.IOException;
+
+import org.apache.gobblin.writer.BatchAsyncDataWriter;
+import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
+import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.IndexNotFoundException;
+import org.testng.Assert;
+
+import com.typesafe.config.Config;
+
+
+/**
+ * A variant that uses the {@link ElasticsearchTransportClientWriter}
+ */
+public class TransportWriterVariant implements WriterVariant {
+  @Override
+  public String getName() {
+    return "transport";
+  }
+
+  @Override
+  public ConfigBuilder getConfigBuilder() {
+    return new ConfigBuilder()
+        .setClientType("transport");
+  }
+
+  @Override
+  public BatchAsyncDataWriter getBatchAsyncDataWriter(Config config)
+      throws IOException {
+    ElasticsearchTransportClientWriter transportClientWriter = new ElasticsearchTransportClientWriter(config);
+    return transportClientWriter;
+  }
+
+  @Override
+  public TestClient getTestClient(Config config)
+      throws IOException {
+    final ElasticsearchTransportClientWriter transportClientWriter = new ElasticsearchTransportClientWriter(config);
+    final TransportClient transportClient = transportClientWriter.getTransportClient();
+    return new TestClient() {
+      @Override
+      public GetResponse get(GetRequest getRequest)
+          throws IOException {
+        try {
+          return transportClient.get(getRequest).get();
+        } catch (Exception e) {
+          throw new IOException(e);
+        }
+      }
+
+      @Override
+      public void recreateIndex(String indexName)
+          throws IOException {
+        DeleteIndexRequestBuilder dirBuilder = transportClient.admin().indices().prepareDelete(indexName);
+        try {
+          DeleteIndexResponse diResponse = dirBuilder.execute().actionGet();
+        } catch (IndexNotFoundException ie) {
+          System.out.println("Index not found... that's ok");
+        }
+
+        CreateIndexRequestBuilder cirBuilder = transportClient.admin().indices().prepareCreate(indexName);
+        CreateIndexResponse ciResponse = cirBuilder.execute().actionGet();
+        Assert.assertTrue(ciResponse.isAcknowledged(), "Create index succeeeded");
+      }
+
+      @Override
+      public void close()
+          throws IOException {
+        transportClientWriter.close();
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/WriterVariant.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/WriterVariant.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/WriterVariant.java
new file mode 100644
index 0000000..581ec2e
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/WriterVariant.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.writer;
+
+import java.io.IOException;
+
+import org.apache.gobblin.writer.BatchAsyncDataWriter;
+
+import com.typesafe.config.Config;
+
+
+/**
+ * An interface to implement Writer variants to enable generic testing
+ */
+public interface WriterVariant {
+
+  String getName();
+
+  ConfigBuilder getConfigBuilder();
+
+  BatchAsyncDataWriter getBatchAsyncDataWriter(Config config)
+      throws IOException;
+
+  TestClient getTestClient(Config config)
+      throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/AvroRecordGenerator.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/AvroRecordGenerator.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/AvroRecordGenerator.java
new file mode 100644
index 0000000..29433f3
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/AvroRecordGenerator.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.test;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.Collections;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.gobblin.elasticsearch.typemapping.AvroGenericRecordTypeMapper;
+
+
+/**
+ * A generator of Avro records of type {@link GenericRecord}
+ */
+public class AvroRecordGenerator implements RecordTypeGenerator<GenericRecord> {
+  @Override
+  public String getName() {
+    return "avro";
+  }
+
+  @Override
+  public String getTypeMapperClassName() {
+    return AvroGenericRecordTypeMapper.class.getCanonicalName();
+  }
+
+  @Override
+  public GenericRecord getRecord(String id, PayloadType payloadType) {
+    GenericRecord record = getTestAvroRecord(id, payloadType);
+    return record;
+  }
+
+  static GenericRecord getTestAvroRecord(String identifier, PayloadType payloadType) {
+    Schema dataRecordSchema =
+        SchemaBuilder.record("Data").fields().name("data").type().bytesType().noDefault().name("flags").type().intType()
+            .noDefault().endRecord();
+
+    Schema schema;
+    Object payloadValue;
+    switch (payloadType) {
+      case STRING: {
+        schema = SchemaBuilder.record("TestRecord").fields()
+            .name("id").type().stringType().noDefault()
+            .name("key").type().stringType().noDefault()
+            .name("data").type(dataRecordSchema).noDefault()
+            .endRecord();
+        payloadValue = TestUtils.generateRandomAlphaString(20);
+        break;
+      }
+      case LONG: {
+        schema = SchemaBuilder.record("TestRecord").fields()
+            .name("id").type().stringType().noDefault()
+            .name("key").type().longType().noDefault()
+            .name("data").type(dataRecordSchema).noDefault()
+            .endRecord();
+        payloadValue = TestUtils.generateRandomLong();
+        break;
+      }
+      case MAP: {
+        schema = SchemaBuilder.record("TestRecord").fields()
+            .name("id").type().stringType().noDefault()
+            .name("key").type().map().values().stringType().noDefault()
+            .name("data").type(dataRecordSchema).noDefault()
+            .endRecord();
+        payloadValue = Collections.EMPTY_MAP;
+        break;
+      }
+      default: {
+        throw new RuntimeException("Do not know how to handle this time");
+      }
+    }
+
+    GenericData.Record testRecord = new GenericData.Record(schema);
+
+    String testContent = "hello world";
+
+    GenericData.Record dataRecord = new GenericData.Record(dataRecordSchema);
+    dataRecord.put("data", ByteBuffer.wrap(testContent.getBytes(Charset.forName("UTF-8"))));
+    dataRecord.put("flags", 0);
+
+    testRecord.put("key", payloadValue);
+    testRecord.put("data", dataRecord);
+    testRecord.put("id", identifier);
+    return testRecord;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/JsonRecordGenerator.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/JsonRecordGenerator.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/JsonRecordGenerator.java
new file mode 100644
index 0000000..7aea277
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/JsonRecordGenerator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.test;
+
+import java.util.Collections;
+
+import org.apache.gobblin.elasticsearch.typemapping.JsonTypeMapper;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+
+
+/**
+ * A generator of {@link JsonElement} records
+ */
+public class JsonRecordGenerator implements RecordTypeGenerator<JsonElement> {
+  private final Gson gson = new Gson();
+
+  @Override
+  public String getName() {
+    return "json";
+  }
+
+  @Override
+  public String getTypeMapperClassName() {
+    return JsonTypeMapper.class.getCanonicalName();
+  }
+
+  static class TestObject<T> {
+    private String id;
+    private T key;
+
+    TestObject(String id, T payload) {
+      this.id = id;
+      this.key = payload;
+    }
+  }
+
+  @Override
+  public JsonElement getRecord(String id, PayloadType payloadType) {
+    Object testObject;
+    switch (payloadType) {
+      case STRING: {
+        testObject = new TestObject(id, TestUtils.generateRandomAlphaString(20));
+        break;
+      }
+      case LONG: {
+        testObject = new TestObject(id, TestUtils.generateRandomLong());
+        break;
+      }
+      case MAP: {
+        testObject = new TestObject(id, Collections.EMPTY_MAP);
+        break;
+      }
+      default:
+        throw new RuntimeException("Do not know how to handle this type of payload");
+    }
+    JsonElement jsonElement = gson.toJsonTree(testObject);
+    return jsonElement;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/PayloadType.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/PayloadType.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/PayloadType.java
new file mode 100644
index 0000000..d793c47
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/PayloadType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.test;
+
+/**
+ * An enumeration of Payload types
+ * Used to configure the record in tests
+ */
+public enum PayloadType {
+  STRING,
+  LONG,
+  MAP
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/RecordTypeGenerator.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/RecordTypeGenerator.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/RecordTypeGenerator.java
new file mode 100644
index 0000000..00c798f
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/RecordTypeGenerator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.test;
+
+/**
+ * An interface to describe a generator of records
+ */
+public interface RecordTypeGenerator<T> {
+  /**
+   * The name of this record type
+   * @return
+   */
+  String getName();
+
+  /**
+   * A {@link org.apache.gobblin.elasticsearch.typemapping.TypeMapper} that can work with
+   * records of this type
+   * @return
+   */
+  String getTypeMapperClassName();
+
+  /**
+   * Generate a record with the provided characteristics
+   * @param identifier
+   * @param payloadType
+   * @return a record of the type T
+   */
+  T getRecord(String identifier, PayloadType payloadType);
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-eventhub/src/test/java/org/apache/gobblin/eventhub/writer/EventhubBatchTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-eventhub/src/test/java/org/apache/gobblin/eventhub/writer/EventhubBatchTest.java b/gobblin-modules/gobblin-eventhub/src/test/java/org/apache/gobblin/eventhub/writer/EventhubBatchTest.java
index ab4bffa..9db3782 100644
--- a/gobblin-modules/gobblin-eventhub/src/test/java/org/apache/gobblin/eventhub/writer/EventhubBatchTest.java
+++ b/gobblin-modules/gobblin-eventhub/src/test/java/org/apache/gobblin/eventhub/writer/EventhubBatchTest.java
@@ -19,6 +19,8 @@ package org.apache.gobblin.eventhub.writer;
 import java.io.IOException;
 
 import org.apache.gobblin.writer.BytesBoundedBatch;
+import org.apache.gobblin.writer.LargeMessagePolicy;
+import org.apache.gobblin.writer.RecordTooLargeException;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -28,41 +30,44 @@ import org.apache.gobblin.writer.WriteCallback;
 public class EventhubBatchTest {
 
   @Test
-  public void testBatchWithLargeRecord() throws IOException {
+  public void testBatchWithLargeRecord()
+      throws IOException, RecordTooLargeException {
     // Assume memory size has only 2 bytes
     BytesBoundedBatch batch = new BytesBoundedBatch(8, 3000);
 
     String record = "abcdefgh";
 
     // Record is larger than the memory size limit, the first append should fail
-    Assert.assertNull(batch.tryAppend(record, WriteCallback.EMPTY));
+    Assert.assertNull(batch.tryAppend(record, WriteCallback.EMPTY, LargeMessagePolicy.DROP));
 
     // The second append should still fail
-    Assert.assertNull(batch.tryAppend(record, WriteCallback.EMPTY));
+    Assert.assertNull(batch.tryAppend(record, WriteCallback.EMPTY, LargeMessagePolicy.DROP));
   }
 
   @Test
-  public void testBatch() throws IOException {
+  public void testBatch()
+      throws IOException, RecordTooLargeException {
     // Assume memory size has only 200 bytes
     BytesBoundedBatch batch = new BytesBoundedBatch(200, 3000);
 
     // Add additional 15 bytes overhead, total size is 27 bytes
     String record = "abcdefgh";
 
-    Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY));
-    Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY));
-    Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY));
-    Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY));
-    Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY));
-    Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY));
-    Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY));
+    LargeMessagePolicy policy = LargeMessagePolicy.DROP;
+    Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY, policy));
+    Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY, policy));
+    Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY, policy));
+    Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY, policy));
+    Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY, policy));
+    Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY, policy));
+    Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY, policy));
 
     // Batch has room for 8th record
-    Assert.assertEquals(batch.hasRoom(record), true);
-    Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY));
+    Assert.assertEquals(batch.hasRoom(record, policy), true);
+    Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY, policy));
 
     // Batch has no room for 9th record
-    Assert.assertEquals(batch.hasRoom(record), false);
-    Assert.assertNull(batch.tryAppend(record, WriteCallback.EMPTY));
+    Assert.assertEquals(batch.hasRoom(record, policy), false);
+    Assert.assertNull(batch.tryAppend(record, WriteCallback.EMPTY, policy));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-test-utils/src/main/java/org/apache/gobblin/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-test-utils/src/main/java/org/apache/gobblin/test/TestUtils.java b/gobblin-test-utils/src/main/java/org/apache/gobblin/test/TestUtils.java
index a583198..68c79e9 100644
--- a/gobblin-test-utils/src/main/java/org/apache/gobblin/test/TestUtils.java
+++ b/gobblin-test-utils/src/main/java/org/apache/gobblin/test/TestUtils.java
@@ -45,6 +45,27 @@ public class TestUtils {
     return messageBytes;
   }
 
+  private static final char[] alphas = new char[26];
+
+  public static Long generateRandomLong() {
+    return rng.nextLong();
+  }
+
+  static {
+    char ch = 'a';
+    for (int i = 0; i < 26; i++) {
+      alphas[i] = ch++;
+    }
+  }
+
+  public static String generateRandomAlphaString(int stringLength) {
+    char[] newString = new char[stringLength];
+    for (int i = 0; i < stringLength; ++i)
+    {
+      newString[i] = alphas[rng.nextInt(26)];
+    }
+    return new String(newString);
+  }
 
   /**
    * TODO: Currently generates a static schema avro record.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gradle/scripts/globalDependencies.gradle
----------------------------------------------------------------------
diff --git a/gradle/scripts/globalDependencies.gradle b/gradle/scripts/globalDependencies.gradle
index d1d2e03..d64db67 100644
--- a/gradle/scripts/globalDependencies.gradle
+++ b/gradle/scripts/globalDependencies.gradle
@@ -23,20 +23,22 @@ subprojects {
     configurations {
       compile
       dependencies {
-        compile(externalDependency.hadoopCommon) {
-          exclude module: 'servlet-api'
-        }
-        compile externalDependency.hadoopClientCore
-        compile externalDependency.hadoopAnnotations
-        if (project.name.equals('gobblin-runtime') || project.name.equals('gobblin-test')) {
-          compile externalDependency.hadoopClientCommon
-        }
-        compile(externalDependency.guava) {
-          force = true
-        }
-        compile(externalDependency.commonsCodec) {
-          force = true
-        }
+        if (!project.name.contains('gobblin-elasticsearch-deps')) {
+          compile(externalDependency.hadoopCommon) {
+            exclude module: 'servlet-api'
+          }
+          compile externalDependency.hadoopClientCore
+          compile externalDependency.hadoopAnnotations
+          if (project.name.equals('gobblin-runtime') || project.name.equals('gobblin-test')) {
+            compile externalDependency.hadoopClientCommon
+          }
+          compile(externalDependency.guava) {
+              force = true
+          }
+      }
+          compile(externalDependency.commonsCodec) {
+            force = true
+          }
 
         // Required to add JDK's tool jar, which is required to run byteman tests.
         testCompile (files(((URLClassLoader) ToolProvider.getSystemToolClassLoader()).getURLs()))


[2/2] incubator-gobblin git commit: [GOBBLIN-17] Add Elasticsearch writer (rest + transport)

Posted by ab...@apache.org.
[GOBBLIN-17] Add Elasticsearch writer (rest + transport)

Closes #2419 from shirshanka/elastic


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f1bc746c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f1bc746c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f1bc746c

Branch: refs/heads/master
Commit: f1bc746ca50cffa1247c00b6c5bdd34b7321198d
Parents: ef438c8
Author: Shirshanka Das <sd...@linkedin.com>
Authored: Fri Aug 31 22:31:21 2018 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Fri Aug 31 22:31:45 2018 -0700

----------------------------------------------------------------------
 .../gobblin/writer/AsyncWriterManager.java      |   3 +
 .../java/org/apache/gobblin/writer/Batch.java   |  22 +-
 .../gobblin/writer/BufferedAsyncDataWriter.java |   4 +-
 .../gobblin/writer/BytesBoundedBatch.java       |  10 +-
 .../gobblin/writer/LargeMessagePolicy.java      |  26 ++
 .../gobblin/writer/RecordTooLargeException.java |  20 ++
 .../writer/SequentialBasedBatchAccumulator.java |  65 +++--
 .../gobblin-flavor-standard.gradle              |   1 +
 .../src/main/resources/wikipedia-elastic.conf   |  64 +++++
 .../gobblin-elasticsearch-deps/build.gradle     |  50 ++++
 .../gobblin-elasticsearch/build.gradle          |  76 ++++++
 .../scripts/install_test_deps.sh                |  40 +++
 .../scripts/uninstall_test_deps.sh              |  23 ++
 .../AvroGenericRecordSerializer.java            |  80 ++++++
 .../AvroGenericRecordTypeMapper.java            |  71 ++++++
 .../typemapping/FieldMappingException.java      |  35 +++
 .../typemapping/GsonJsonSerializer.java         |  52 ++++
 .../typemapping/JsonSerializer.java             |  30 +++
 .../typemapping/JsonTypeMapper.java             |  56 +++++
 .../typemapping/SerializationException.java     |  31 +++
 .../elasticsearch/typemapping/TypeMapper.java   |  36 +++
 .../writer/ElasticsearchDataWriterBuilder.java  |  83 +++++++
 .../writer/ElasticsearchRestWriter.java         | 232 ++++++++++++++++++
 .../ElasticsearchTransportClientWriter.java     | 118 +++++++++
 .../writer/ElasticsearchWriterBase.java         | 168 +++++++++++++
 .../ElasticsearchWriterConfigurationKeys.java   |  71 ++++++
 .../elasticsearch/writer/ExceptionLogger.java   |  26 ++
 .../writer/FutureCallbackHolder.java            | 193 +++++++++++++++
 .../writer/MalformedDocPolicy.java              |  26 ++
 .../elasticsearch/ElasticsearchTestServer.java  | 217 +++++++++++++++++
 .../ElasticsearchTestServerTest.java            |  50 ++++
 .../elasticsearch/writer/ConfigBuilder.java     |  72 ++++++
 .../ElasticsearchTransportClientWriterTest.java |  54 +++++
 .../writer/ElasticsearchWriterBaseTest.java     | 113 +++++++++
 .../ElasticsearchWriterIntegrationTest.java     | 243 +++++++++++++++++++
 .../elasticsearch/writer/RestWriterVariant.java |  97 ++++++++
 .../elasticsearch/writer/TestClient.java        |  37 +++
 .../writer/TransportWriterVariant.java          |  96 ++++++++
 .../elasticsearch/writer/WriterVariant.java     |  40 +++
 .../gobblin/test/AvroRecordGenerator.java       | 104 ++++++++
 .../gobblin/test/JsonRecordGenerator.java       |  75 ++++++
 .../org/apache/gobblin/test/PayloadType.java    |  27 +++
 .../gobblin/test/RecordTypeGenerator.java       |  43 ++++
 .../eventhub/writer/EventhubBatchTest.java      |  35 +--
 .../java/org/apache/gobblin/test/TestUtils.java |  21 ++
 gradle/scripts/globalDependencies.gradle        |  30 +--
 46 files changed, 3005 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-core-base/src/main/java/org/apache/gobblin/writer/AsyncWriterManager.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/AsyncWriterManager.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/AsyncWriterManager.java
index 2be89c6..a599753 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/AsyncWriterManager.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/AsyncWriterManager.java
@@ -340,6 +340,8 @@ public class AsyncWriterManager<D> implements WatermarkAwareWriter<D>, DataWrite
               .update(currTime - attempt.getPrevAttemptTimestampNanos(), TimeUnit.NANOSECONDS);
         }
         if (attempt.attemptNum <= AsyncWriterManager.this.numRetries) { // attempts must == numRetries + 1
+          log.debug("Attempt {} had failure: {}; re-enqueueing record: {}", attempt.attemptNum, throwable.getMessage(),
+              attempt.getRecord().toString());
           attempt.incAttempt();
           attempt.setPrevAttemptFailure(throwable);
           AsyncWriterManager.this.retryQueue.get().add(attempt);
@@ -391,6 +393,7 @@ public class AsyncWriterManager<D> implements WatermarkAwareWriter<D>, DataWrite
           Attempt attempt = this.retryQueue.take();
           if (attempt != null) {
             maybeSleep(attempt.getPrevAttemptTimestampNanos());
+            log.debug("Retry thread will retry record: {}", attempt.getRecord().toString());
             attemptWrite(attempt);
           }
         } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-core-base/src/main/java/org/apache/gobblin/writer/Batch.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/Batch.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/Batch.java
index ff16590..faf815c 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/Batch.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/Batch.java
@@ -127,17 +127,18 @@ public abstract class Batch<D>{
   /**
    * A method to check if the batch has the room to add a new record
    *  @param record: record needs to be added
+   *  @param largeMessagePolicy: the policy that is in effect for large messages
    *  @return Indicates if this batch still have enough space to hold a new record
    */
-  public abstract boolean hasRoom (D record);
+  public abstract boolean hasRoom (D record, LargeMessagePolicy largeMessagePolicy);
 
   /**
    * Add a record to this batch
    * <p>
    *   Implementation of this method should always ensure the record can be added successfully
-   *   The contract between {@link Batch#tryAppend(Object, WriteCallback)} and this method is this method
+   *   The contract between {@link Batch#tryAppend(Object, WriteCallback, LargeMessagePolicy)} and this method is this method
    *   is responsible for adding record to internal batch memory and the check for the room space is performed
-   *   by {@link Batch#hasRoom(Object)}. All the potential issues for adding a record should
+   *   by {@link Batch#hasRoom(Object, LargeMessagePolicy)}. All the potential issues for adding a record should
    *   already be resolved before this method is invoked.
    * </p>
    *
@@ -162,14 +163,19 @@ public abstract class Batch<D>{
    *
    *   @param record : record needs to be added
    *   @param callback : A callback which will be invoked when the whole batch gets sent and acknowledged
+   *   @param largeMessagePolicy : the {@link LargeMessagePolicy} that is in effect for this batch
    *   @return A future object which contains {@link RecordMetadata}
    */
-  public Future<RecordMetadata> tryAppend(D record, WriteCallback callback) {
-    if (!hasRoom(record)) {
-      LOG.debug ("Cannot add " + record + " to previous batch because the batch already has " + getCurrentSizeInByte() + " bytes");
+  public Future<RecordMetadata> tryAppend(D record, WriteCallback callback, LargeMessagePolicy largeMessagePolicy)
+      throws RecordTooLargeException {
+    if (!hasRoom(record, largeMessagePolicy)) {
+      LOG.debug ("Cannot add {} to previous batch because the batch already has {} bytes",
+          record.toString(), getCurrentSizeInByte());
+      if (largeMessagePolicy == LargeMessagePolicy.FAIL) {
+        throw new RecordTooLargeException();
+      }
       return null;
     }
-
     this.append(record);
     thunks.add(new Thunk(callback, getRecordSizeInByte(record)));
     RecordFuture future = new RecordFuture(latch, recordCount);
@@ -178,7 +184,9 @@ public abstract class Batch<D>{
   }
 
   public void await() throws InterruptedException{
+    LOG.debug("Batch {} waiting for {} records", this.id, this.recordCount);
     this.latch.await();
+    LOG.debug("Batch {} done with {} records", this.id, this.recordCount);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BufferedAsyncDataWriter.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BufferedAsyncDataWriter.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BufferedAsyncDataWriter.java
index ceaffec..87039b6 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BufferedAsyncDataWriter.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BufferedAsyncDataWriter.java
@@ -39,7 +39,7 @@ import org.apache.gobblin.annotation.Alpha;
  * @param <D> data record type
  */
 @Alpha
-public abstract class BufferedAsyncDataWriter<D> implements AsyncDataWriter<D> {
+public class BufferedAsyncDataWriter<D> implements AsyncDataWriter<D> {
 
   private RecordProcessor<D> processor;
   private BatchAccumulator<D> accumulator;
@@ -136,7 +136,7 @@ public abstract class BufferedAsyncDataWriter<D> implements AsyncDataWriter<D> {
       return new WriteCallback<Object>() {
         @Override
         public void onSuccess(WriteResponse writeResponse) {
-          LOG.info ("Batch " + batch.getId() + " is on success with size " + batch.getCurrentSizeInByte() + " num of record " + batch.getRecords().size());
+          LOG.debug ("Batch " + batch.getId() + " is on success with size " + batch.getCurrentSizeInByte() + " num of record " + batch.getRecords().size());
           batch.onSuccess(writeResponse);
           batch.done();
           accumulator.deallocate(batch);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BytesBoundedBatch.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BytesBoundedBatch.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BytesBoundedBatch.java
index ef63882..7b6b4dc 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BytesBoundedBatch.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BytesBoundedBatch.java
@@ -62,7 +62,11 @@ public class BytesBoundedBatch<D> extends Batch<D>{
       records.add(record);
     }
 
-    boolean hasRoom (D record) {
+    boolean hasRoom (D record, LargeMessagePolicy largeMessagePolicy) {
+      if (records.isEmpty() && largeMessagePolicy == LargeMessagePolicy.ATTEMPT) {
+        // there is always space for one record, no matter how big :)
+          return true;
+      }
       long recordLen = BytesBoundedBatch.this.getInternalSize(record);
       return (byteSize + recordLen) <= BytesBoundedBatch.this.memSizeLimit;
     }
@@ -80,8 +84,8 @@ public class BytesBoundedBatch<D> extends Batch<D>{
     return memory.getRecords();
   }
 
-  public boolean hasRoom (D object) {
-    return memory.hasRoom(object);
+  public boolean hasRoom (D object, LargeMessagePolicy largeMessagePolicy) {
+    return memory.hasRoom(object, largeMessagePolicy);
   }
 
   public void append (D object) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-core-base/src/main/java/org/apache/gobblin/writer/LargeMessagePolicy.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/LargeMessagePolicy.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/LargeMessagePolicy.java
new file mode 100644
index 0000000..28ca949
--- /dev/null
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/LargeMessagePolicy.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.writer;
+
+/**
+ * Describes how single messages that are larger than a batch message limit should be treated
+ */
+public enum LargeMessagePolicy {
+  DROP, // drop (and log) messages that exceed the threshold
+  ATTEMPT, // attempt to deliver messages that exceed the threshold
+  FAIL // throw an error when this happens
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-core-base/src/main/java/org/apache/gobblin/writer/RecordTooLargeException.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/RecordTooLargeException.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/RecordTooLargeException.java
new file mode 100644
index 0000000..845e6a8
--- /dev/null
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/RecordTooLargeException.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.writer;
+
+public class RecordTooLargeException extends Exception {
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-core-base/src/main/java/org/apache/gobblin/writer/SequentialBasedBatchAccumulator.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/SequentialBasedBatchAccumulator.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/SequentialBasedBatchAccumulator.java
index 9b7a608..58b0942 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/SequentialBasedBatchAccumulator.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/SequentialBasedBatchAccumulator.java
@@ -43,14 +43,16 @@ import org.apache.gobblin.util.ConfigUtils;
  * keeps in the deque until a TTL is expired.
  */
 
-public abstract class SequentialBasedBatchAccumulator<D> extends BatchAccumulator<D> {
+public class SequentialBasedBatchAccumulator<D> extends BatchAccumulator<D> {
 
+  private static final LargeMessagePolicy DEFAULT_LARGE_MESSAGE_POLICY = LargeMessagePolicy.FAIL;
   private Deque<BytesBoundedBatch<D>> dq = new LinkedList<>();
   private IncompleteRecordBatches incomplete = new IncompleteRecordBatches();
   private final long batchSizeLimit;
   private final long memSizeLimit;
   private final double tolerance = 0.95;
   private final long expireInMilliSecond;
+  private final LargeMessagePolicy largeMessagePolicy;
   private static final Logger LOG = LoggerFactory.getLogger(SequentialBasedBatchAccumulator.class);
 
   private final ReentrantLock dqLock = new ReentrantLock();
@@ -63,24 +65,31 @@ public abstract class SequentialBasedBatchAccumulator<D> extends BatchAccumulato
   }
 
   public SequentialBasedBatchAccumulator(Properties properties) {
-    Config config = ConfigUtils.propertiesToConfig(properties);
-    this.batchSizeLimit = ConfigUtils.getLong(config, Batch.BATCH_SIZE,
-            Batch.BATCH_SIZE_DEFAULT);
-
-    this.expireInMilliSecond = ConfigUtils.getLong(config, Batch.BATCH_TTL,
-            Batch.BATCH_TTL_DEFAULT);
-
-    this.capacity = ConfigUtils.getLong(config, Batch.BATCH_QUEUE_CAPACITY,
-            Batch.BATCH_QUEUE_CAPACITY_DEFAULT);
+    this(ConfigUtils.propertiesToConfig(properties));
+  }
 
-    this.memSizeLimit = (long) (this.tolerance * this.batchSizeLimit);
+  public SequentialBasedBatchAccumulator(Config config) {
+    this(ConfigUtils.getLong(config, Batch.BATCH_SIZE,
+            Batch.BATCH_SIZE_DEFAULT),
+        ConfigUtils.getLong(config, Batch.BATCH_TTL,
+            Batch.BATCH_TTL_DEFAULT),
+        ConfigUtils.getLong(config, Batch.BATCH_QUEUE_CAPACITY,
+            Batch.BATCH_QUEUE_CAPACITY_DEFAULT));
   }
 
   public SequentialBasedBatchAccumulator(long batchSizeLimit, long expireInMilliSecond, long capacity) {
+    this(batchSizeLimit, expireInMilliSecond, capacity, DEFAULT_LARGE_MESSAGE_POLICY);
+  }
+
+  public SequentialBasedBatchAccumulator(long batchSizeLimit,
+      long expireInMilliSecond,
+      long capacity,
+      LargeMessagePolicy largeMessagePolicy) {
     this.batchSizeLimit = batchSizeLimit;
     this.expireInMilliSecond = expireInMilliSecond;
     this.capacity = capacity;
     this.memSizeLimit = (long) (this.tolerance * this.batchSizeLimit);
+    this.largeMessagePolicy = largeMessagePolicy;
   }
 
   public long getNumOfBatches () {
@@ -101,7 +110,12 @@ public abstract class SequentialBasedBatchAccumulator<D> extends BatchAccumulato
     try {
       BytesBoundedBatch last = dq.peekLast();
       if (last != null) {
-        Future<RecordMetadata> future = last.tryAppend(record, callback);
+        Future<RecordMetadata> future = null;
+        try {
+          future = last.tryAppend(record, callback, this.largeMessagePolicy);
+        } catch (RecordTooLargeException e) {
+          // Ok if the record was too large for the current batch
+        }
         if (future != null) {
           return future;
         }
@@ -110,12 +124,18 @@ public abstract class SequentialBasedBatchAccumulator<D> extends BatchAccumulato
       // Create a new batch because previous one has no space
       BytesBoundedBatch batch = new BytesBoundedBatch(this.memSizeLimit, this.expireInMilliSecond);
       LOG.debug("Batch " + batch.getId() + " is generated");
-      Future<RecordMetadata> future = batch.tryAppend(record, callback);
+      Future<RecordMetadata> future = null;
+      try {
+        future = batch.tryAppend(record, callback, this.largeMessagePolicy);
+      } catch (RecordTooLargeException e) {
+        // If a new batch also wasn't able to accomodate the new message
+        throw new RuntimeException("Failed due to a message that was too large", e);
+      }
 
-      // Even single record can exceed the batch size limit
-      // Ignore the record because Eventhub can only accept payload less than 256KB
+      // The future might be null, since the largeMessagePolicy might be set to DROP
       if (future == null) {
-        LOG.error("Batch " + batch.getId() + " is marked as complete because it contains a huge record: "
+        assert largeMessagePolicy.equals(LargeMessagePolicy.DROP);
+        LOG.error("Batch " + batch.getId() + " is silently marked as complete, dropping a huge record: "
                 + record);
         future = Futures.immediateFuture(new RecordMetadata(0));
         callback.onSuccess(WriteResponse.EMPTY);
@@ -124,6 +144,7 @@ public abstract class SequentialBasedBatchAccumulator<D> extends BatchAccumulato
 
       // if queue is full, we should not add more
       while (dq.size() >= this.capacity) {
+        LOG.debug("Accumulator size {} is greater than capacity {}, waiting", dq.size(), this.capacity);
         this.notFull.await();
       }
       dq.addLast(batch);
@@ -187,7 +208,7 @@ public abstract class SequentialBasedBatchAccumulator<D> extends BatchAccumulato
         return dq.poll();
       } else {
           while (dq.size() == 0) {
-            LOG.info ("ready to sleep because of queue is empty");
+            LOG.debug ("ready to sleep because of queue is empty");
             SequentialBasedBatchAccumulator.this.notEmpty.await();
             if (SequentialBasedBatchAccumulator.this.isClosed()) {
               return dq.poll();
@@ -203,7 +224,7 @@ public abstract class SequentialBasedBatchAccumulator<D> extends BatchAccumulato
 
           if (dq.size() == 1) {
             if (dq.peekFirst().isTTLExpire()) {
-              LOG.info ("Batch " + dq.peekFirst().getId() + " is expired");
+              LOG.debug ("Batch " + dq.peekFirst().getId() + " is expired");
               BytesBoundedBatch candidate = dq.poll();
               SequentialBasedBatchAccumulator.this.notFull.signal();
               return candidate;
@@ -240,12 +261,16 @@ public abstract class SequentialBasedBatchAccumulator<D> extends BatchAccumulato
   public void flush() {
     try {
       ArrayList<Batch> batches = this.incomplete.all();
-      LOG.info ("flush on {} batches", batches.size());
+      int numOutstandingRecords = 0;
+      for (Batch batch: batches) {
+        numOutstandingRecords += batch.getRecords().size();
+      }
+      LOG.debug ("Flush called on {} batches with {} records total", batches.size(), numOutstandingRecords);
       for (Batch batch: batches) {
         batch.await();
       }
     } catch (Exception e) {
-      LOG.info ("Error happens when flushing");
+      LOG.error ("Error happened while flushing batches");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-distribution/gobblin-flavor-standard.gradle
----------------------------------------------------------------------
diff --git a/gobblin-distribution/gobblin-flavor-standard.gradle b/gobblin-distribution/gobblin-flavor-standard.gradle
index c2061a5..2f544ca 100644
--- a/gobblin-distribution/gobblin-flavor-standard.gradle
+++ b/gobblin-distribution/gobblin-flavor-standard.gradle
@@ -21,4 +21,5 @@ dependencies {
   compile project(':gobblin-modules:gobblin-crypto-provider')
   compile project(':gobblin-modules:gobblin-kafka-08')
   compile project(':gobblin-modules:google-ingestion')
+  compile project(':gobblin-modules:gobblin-elasticsearch') 
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-example/src/main/resources/wikipedia-elastic.conf
----------------------------------------------------------------------
diff --git a/gobblin-example/src/main/resources/wikipedia-elastic.conf b/gobblin-example/src/main/resources/wikipedia-elastic.conf
new file mode 100644
index 0000000..9db386e
--- /dev/null
+++ b/gobblin-example/src/main/resources/wikipedia-elastic.conf
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# A sample pull file that copies an input Kafka topic and produces to an output Kafka topic
+# with sampling
+job {
+  name=PullFromWikipediaToElasticSearch
+  group=Wikipedia
+  description=Pull from Wikipedia and write to ElasticSearch
+}
+
+task.maxretries=0
+
+source {
+  class=org.apache.gobblin.example.wikipedia.WikipediaSource
+  page.titles="Wikipedia:Sandbox"
+  revisions.cnt=5
+}
+
+wikipedia {
+   api.rooturl="https://en.wikipedia.org/w/api.php"
+  avro.schema="{\"namespace\": \"example.wikipedia.avro\",\"type\": \"record\",\"name\": \"WikipediaArticle\",\"fields\": [{\"name\": \"revid\", \"type\": [\"double\", \"null\"]},{\"name\": \"pageid\", \"type\": [\"double\", \"null\"]},{\"name\": \"title\", \"type\": [\"string\", \"null\"]},{\"name\": \"user\", \"type\": [\"string\", \"null\"]},{\"name\": \"anon\", \"type\": [\"string\", \"null\"]},{\"name\": \"userid\",  \"type\": [\"double\", \"null\"]},{\"name\": \"timestamp\", \"type\": [\"string\", \"null\"]},{\"name\": \"size\",  \"type\": [\"double\", \"null\"]},{\"name\": \"contentformat\",  \"type\": [\"string\", \"null\"]},{\"name\": \"contentmodel\",  \"type\": [\"string\", \"null\"]},{\"name\": \"content\", \"type\": [\"string\", \"null\"]}]}"
+} 
+converter.classes=org.apache.gobblin.example.wikipedia.WikipediaConverter
+extract.namespace=org.apache.gobblin.example.wikipedia
+
+writer {
+  builder.class=org.apache.gobblin.elasticsearch.writer.ElasticsearchDataWriterBuilder
+  elasticsearch {
+    client.type=REST
+    index.name=wikipedia-test
+    index.type=docs
+    #hosts=hostname
+    #ssl {
+    #  enabled=true
+    #  keystoreType=pkcs12
+    #  keystorePassword=change_me
+    #  keystoreLocation=/path/to/.p12 file
+    #  truststoreType=jks
+    #  truststoreLocation=/path/to/cacerts
+    #  truststorePassword=changeme
+    #}
+    typeMapperClass=org.apache.gobblin.elasticsearch.typemapping.AvroGenericRecordTypeMapper
+    useIdFromData=false  # change to true if you want to use a field from the record as the id field
+    #idFieldName=id      # change to the field of the record that you want to use as the id of the document
+  }
+}
+
+data.publisher.type=org.apache.gobblin.publisher.NoopPublisher
+

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch-deps/build.gradle
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch-deps/build.gradle b/gobblin-modules/gobblin-elasticsearch-deps/build.gradle
new file mode 100644
index 0000000..35e9a36
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch-deps/build.gradle
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+buildscript {
+    repositories {
+        jcenter()
+    }
+    dependencies {
+        classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4'
+    }
+}
+
+apply plugin: 'com.github.johnrengelman.shadow'
+apply plugin: 'java'
+
+dependencies {
+  compile "org.elasticsearch.client:transport:5.6.8"
+  compile "org.elasticsearch.client:elasticsearch-rest-high-level-client:5.6.8"
+  compile "com.google.guava:guava:18.0"
+}
+
+
+configurations {
+ compile {
+    exclude group: "org.apache.hadoop"
+    exclude group: "com.sun.jersey.contribs"
+  }
+}
+
+shadowJar {
+  zip64 true
+  relocate 'com.google.common', 'shadow.gobblin.elasticsearch.com.google.common'
+}
+
+ext.classification="library"

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/build.gradle
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/build.gradle b/gobblin-modules/gobblin-elasticsearch/build.gradle
new file mode 100644
index 0000000..2d624b2
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/build.gradle
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: 'java'
+
+dependencies {
+  compile project(":gobblin-api")
+  compile project(":gobblin-core-base")
+  compile project(":gobblin-utility")
+  compile project(":gobblin-metrics-libs:gobblin-metrics")
+  compile project(path: ":gobblin-modules:gobblin-elasticsearch-deps", configuration:"shadow")
+
+  compile "org.apache.logging.log4j:log4j-to-slf4j:2.7"
+  compile "org.slf4j:slf4j-api:1.7.21"
+  compile externalDependency.avro
+  compile externalDependency.jacksonCore
+  compile externalDependency.jacksonMapper
+  compile externalDependency.commonsHttpClient
+  compile externalDependency.commonsPool
+  compile externalDependency.commonsLang3
+  compile externalDependency.slf4j
+  compile externalDependency.httpclient
+  compile externalDependency.httpcore
+  compile externalDependency.lombok
+  compile externalDependency.metricsCore
+  compile externalDependency.typesafeConfig
+  compile externalDependency.findBugsAnnotations
+
+  testCompile project(":gobblin-runtime")
+  testCompile project(":gobblin-test-utils")
+  testCompile externalDependency.jsonAssert
+  testCompile externalDependency.mockito
+  testCompile externalDependency.testng
+}
+
+task installTestDependencies(type:Exec) {
+  workingDir "${project.rootDir}/gobblin-modules/gobblin-elasticsearch/"
+  commandLine './scripts/install_test_deps.sh'
+}
+
+task uninstallTestDependencies(type: Exec) {
+  workingDir "${project.rootDir}/gobblin-modules/gobblin-elasticsearch/"
+  commandLine './scripts/uninstall_test_deps.sh'
+  
+}
+
+test.dependsOn installTestDependencies
+test.finalizedBy uninstallTestDependencies
+
+configurations {
+  compile {
+    transitive = false 
+  }
+}
+
+test {
+  workingDir rootProject.rootDir
+  maxParallelForks = 4
+}
+
+
+ext.classification="library"

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/scripts/install_test_deps.sh
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/scripts/install_test_deps.sh b/gobblin-modules/gobblin-elasticsearch/scripts/install_test_deps.sh
new file mode 100755
index 0000000..48324da
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/scripts/install_test_deps.sh
@@ -0,0 +1,40 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+TARGET_DIR="test-elasticsearch"
+ES_VERSION=5.6.8
+ES_DIR=${TARGET_DIR}/elasticsearch-${ES_VERSION}
+echo ${TARGET_DIR}
+mkdir -p ${TARGET_DIR}
+
+
+ES_TAR=${TARGET_DIR}/elasticsearch-${ES_VERSION}.tar.gz
+ES_URL=https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-${ES_VERSION}.tar.gz
+echo ${ES_URL}
+echo ${ES_TAR}
+if [ -d $ES_DIR ];
+then
+  echo "Skipping download since version already found at ${ES_DIR}"
+  echo "Cleaning up directory"
+  rm -rf ${TARGET_DIR}/elasticsearch-${ES_VERSION}
+else
+  echo "$ES_DIR does not exist, downloading"
+  curl -o ${ES_TAR} ${ES_URL}
+fi
+tar -xzf ${ES_TAR} -C ${TARGET_DIR}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/scripts/uninstall_test_deps.sh
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/scripts/uninstall_test_deps.sh b/gobblin-modules/gobblin-elasticsearch/scripts/uninstall_test_deps.sh
new file mode 100755
index 0000000..db79f86
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/scripts/uninstall_test_deps.sh
@@ -0,0 +1,23 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+TARGET_DIR="test-elasticsearch"
+ES_VERSION=5.6.8
+ES_DIR=${TARGET_DIR}/elasticsearch-${ES_VERSION}
+rm -rf ${TARGET_DIR}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordSerializer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordSerializer.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordSerializer.java
new file mode 100644
index 0000000..5242202
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordSerializer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.typemapping;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.io.output.ByteArrayOutputStream;
+
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * A {@link JsonSerializer} for {@link GenericRecord} objects.
+ */
+@Slf4j
+public class AvroGenericRecordSerializer implements JsonSerializer<GenericRecord> {
+
+  private final ByteArrayOutputStream byteArrayOutputStream;
+  private final DataOutputStream out;
+  private final GenericDatumWriter<GenericRecord> writer;
+  private final Closer closer;
+
+
+  public AvroGenericRecordSerializer() {
+    this.closer =Closer.create();
+    this.byteArrayOutputStream = new ByteArrayOutputStream();
+    this.out = this.closer.register(new DataOutputStream(this.byteArrayOutputStream));
+    this.writer = new GenericDatumWriter<GenericRecord>();
+  }
+
+  @Override
+  public void configure(Config config) {
+
+  }
+
+  @Override
+  public synchronized byte[] serializeToJson(GenericRecord serializable)
+      throws SerializationException {
+    try {
+      /**
+       * We use the toString method of Avro to flatten the JSON for optional nullable types.
+       * Otherwise the JSON has an additional level of nesting to encode the type.
+       * e.g. "id": {"string": "id-value"} versus "id": "id-value"
+       * See {@link: https://issues.apache.org/jira/browse/AVRO-1582} for a good discussion on this.
+       */
+      String serialized = serializable.toString();
+      return serialized.getBytes(Charset.forName("UTF-8"));
+
+    } catch (Exception exception) {
+      throw new SerializationException("Could not serializeToJson Avro record", exception);
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    this.closer.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordTypeMapper.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordTypeMapper.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordTypeMapper.java
new file mode 100644
index 0000000..0586f3c
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordTypeMapper.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.typemapping;
+
+import java.io.IOException;
+
+import org.apache.avro.generic.GenericRecord;
+
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * A TypeMapper for Avro GenericRecords.
+ */
+@Slf4j
+public class AvroGenericRecordTypeMapper implements TypeMapper<GenericRecord> {
+
+  private final JsonSerializer<GenericRecord> serializer;
+  private final Closer closer;
+
+  public AvroGenericRecordTypeMapper() {
+    this.closer =Closer.create();
+    this.serializer = this.closer.register(new AvroGenericRecordSerializer());
+  }
+
+  @Override
+  public void configure(Config config) {
+    this.serializer.configure(config);
+    log.info("AvroGenericRecordTypeMapper successfully configured");
+  }
+
+  @Override
+  public JsonSerializer<GenericRecord> getSerializer() {
+    return this.serializer;
+  }
+
+  @Override
+  public String getValue(String fieldName, GenericRecord record)
+      throws FieldMappingException {
+    try {
+      Object idValue = record.get(fieldName);
+      return idValue.toString();
+    }
+    catch (Exception e) {
+      throw new FieldMappingException("Could not find field " + fieldName, e);
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    this.closer.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/FieldMappingException.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/FieldMappingException.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/FieldMappingException.java
new file mode 100644
index 0000000..781f918
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/FieldMappingException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.typemapping;
+
+/**
+ * An exception for type mapping errors during field-based access
+ */
+public class FieldMappingException extends Exception {
+
+  public FieldMappingException(Exception e) {
+    super(e);
+  }
+
+  public FieldMappingException(String message, Exception e) {
+    super(message, e);
+  }
+
+  public FieldMappingException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/GsonJsonSerializer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/GsonJsonSerializer.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/GsonJsonSerializer.java
new file mode 100644
index 0000000..d44986c
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/GsonJsonSerializer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.typemapping;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+import com.google.gson.Gson;
+import com.typesafe.config.Config;
+
+
+/**
+ * A Gson based Json Serializer
+ */
+public class GsonJsonSerializer implements JsonSerializer<Object> {
+  private final Gson _gson = new Gson();
+
+  @Override
+  public void configure(Config config) {
+
+  }
+
+  @Override
+  public byte[] serializeToJson(Object serializable)
+      throws SerializationException {
+    String jsonString = _gson.toJson(serializable);
+    try {
+      return jsonString.getBytes("UTF-8");
+    } catch (UnsupportedEncodingException e) {
+      throw new SerializationException(e);
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonSerializer.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonSerializer.java
new file mode 100644
index 0000000..41f2885
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonSerializer.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.typemapping;
+
+import java.io.Closeable;
+
+import com.typesafe.config.Config;
+
+
+public interface JsonSerializer<T> extends Closeable {
+
+  void configure(Config config);
+
+  byte[] serializeToJson(T serializable) throws SerializationException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonTypeMapper.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonTypeMapper.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonTypeMapper.java
new file mode 100644
index 0000000..8491147
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonTypeMapper.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.typemapping;
+
+import java.io.IOException;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.typesafe.config.Config;
+
+
+public class JsonTypeMapper implements TypeMapper<JsonElement> {
+
+  private final JsonSerializer serializer = new GsonJsonSerializer();
+  @Override
+  public void configure(Config config) {
+
+  }
+
+  @Override
+  public JsonSerializer<JsonElement> getSerializer() {
+    return serializer;
+  }
+
+  @Override
+  public String getValue(String fieldName, JsonElement record)
+      throws FieldMappingException {
+    assert record.isJsonObject();
+    JsonObject jsonObject = record.getAsJsonObject();
+    if (jsonObject.has(fieldName)) {
+      return jsonObject.get(fieldName).getAsString();
+    } else {
+      throw new FieldMappingException("Could not find field :" + fieldName);
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/SerializationException.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/SerializationException.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/SerializationException.java
new file mode 100644
index 0000000..d2edb53
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/SerializationException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.typemapping;
+
+/**
+ * A class to hold exceptions thrown by {@link JsonSerializer}s.
+ */
+public class SerializationException extends Exception {
+  public SerializationException(Exception e) {
+    super(e);
+  }
+
+  public SerializationException(String s, Exception exception) {
+    super(s, exception);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/TypeMapper.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/TypeMapper.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/TypeMapper.java
new file mode 100644
index 0000000..5aa909b
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/TypeMapper.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.typemapping;
+
+import java.io.Closeable;
+
+import com.typesafe.config.Config;
+
+
+/**
+ * An interface that enables the ElasticSearch writer to work with different types of records.
+ * Supports serialization and id-getter capabilities
+ */
+public interface TypeMapper<T> extends Closeable {
+
+  void configure(Config config);
+
+  JsonSerializer<T> getSerializer();
+
+  String getValue(String fieldName, T record) throws FieldMappingException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchDataWriterBuilder.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchDataWriterBuilder.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchDataWriterBuilder.java
new file mode 100644
index 0000000..cb6ed15
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchDataWriterBuilder.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.writer;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.writer.AsyncWriterManager;
+import org.apache.gobblin.writer.BatchAsyncDataWriter;
+import org.apache.gobblin.writer.BufferedAsyncDataWriter;
+import org.apache.gobblin.writer.DataWriter;
+import org.apache.gobblin.writer.DataWriterBuilder;
+import org.apache.gobblin.writer.SequentialBasedBatchAccumulator;
+
+import com.google.gson.JsonObject;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.configuration.State;
+
+public class ElasticsearchDataWriterBuilder extends DataWriterBuilder {
+
+  @Override
+  public DataWriter build() throws IOException {
+
+    State state = this.destination.getProperties();
+    Properties taskProps = state.getProperties();
+    Config config = ConfigUtils.propertiesToConfig(taskProps);
+
+    SequentialBasedBatchAccumulator<JsonObject> batchAccumulator = new SequentialBasedBatchAccumulator<>(taskProps);
+
+    BatchAsyncDataWriter asyncDataWriter;
+    switch (ElasticsearchWriterConfigurationKeys.ClientType.valueOf(
+        ConfigUtils.getString(config,
+            ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_CLIENT_TYPE,
+            ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_CLIENT_TYPE_DEFAULT).toUpperCase())) {
+      case REST: {
+        asyncDataWriter = new ElasticsearchRestWriter(config);
+        break;
+      }
+      case TRANSPORT: {
+        asyncDataWriter = new ElasticsearchTransportClientWriter(config);
+        break;
+      }
+      default: {
+        throw new IllegalArgumentException("Need to specify which "
+            + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_CLIENT_TYPE
+            + " client to use (rest/transport)");
+      }
+    }
+    BufferedAsyncDataWriter bufferedAsyncDataWriter = new BufferedAsyncDataWriter(batchAccumulator, asyncDataWriter);
+
+    double failureAllowance = ConfigUtils.getDouble(config, ElasticsearchWriterConfigurationKeys.FAILURE_ALLOWANCE_PCT_CONFIG,
+        ElasticsearchWriterConfigurationKeys.FAILURE_ALLOWANCE_PCT_DEFAULT) / 100.0;
+    boolean retriesEnabled = ConfigUtils.getBoolean(config, ElasticsearchWriterConfigurationKeys.RETRIES_ENABLED,
+        ElasticsearchWriterConfigurationKeys.RETRIES_ENABLED_DEFAULT);
+    int maxRetries = ConfigUtils.getInt(config, ElasticsearchWriterConfigurationKeys.MAX_RETRIES,
+        ElasticsearchWriterConfigurationKeys.MAX_RETRIES_DEFAULT);
+
+
+    return AsyncWriterManager.builder()
+        .failureAllowanceRatio(failureAllowance)
+        .retriesEnabled(retriesEnabled)
+        .numRetries(maxRetries)
+        .config(config)
+        .asyncDataWriter(bufferedAsyncDataWriter)
+        .build();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchRestWriter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchRestWriter.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchRestWriter.java
new file mode 100644
index 0000000..7cd77da
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchRestWriter.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.elasticsearch.writer;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.security.KeyStore;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.math3.util.Pair;
+import org.apache.gobblin.password.PasswordManager;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.writer.Batch;
+import org.apache.gobblin.writer.BatchAsyncDataWriter;
+import org.apache.gobblin.writer.GenericWriteResponse;
+import org.apache.gobblin.writer.WriteCallback;
+import org.apache.gobblin.writer.WriteResponse;
+import org.apache.http.HttpHost;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.impl.nio.reactor.IOReactorConfig;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.SSLContexts;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.typesafe.config.Config;
+
+import javax.annotation.Nullable;
+import javax.net.ssl.SSLContext;
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+public class ElasticsearchRestWriter extends ElasticsearchWriterBase implements BatchAsyncDataWriter<Object> {
+
+  private final RestHighLevelClient client;
+  private final RestClient lowLevelClient;
+
+  ElasticsearchRestWriter(Config config)
+      throws IOException {
+    super(config);
+
+
+    int threadCount = ConfigUtils.getInt(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_CLIENT_THREADPOOL_SIZE,
+        ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_CLIENT_THREADPOOL_DEFAULT);
+    try {
+
+      PasswordManager passwordManager = PasswordManager.getInstance();
+      Boolean sslEnabled = ConfigUtils.getBoolean(config,
+          ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_ENABLED,
+          ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_ENABLED_DEFAULT);
+      if (sslEnabled) {
+
+        // keystore
+        String keyStoreType = ConfigUtils
+            .getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_KEYSTORE_TYPE,
+                ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_KEYSTORE_TYPE_DEFAULT);
+        String keyStoreFilePassword = passwordManager.readPassword(ConfigUtils
+            .getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_KEYSTORE_PASSWORD, ""));
+        String identityFilepath = ConfigUtils
+            .getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_KEYSTORE_LOCATION, "");
+
+        // truststore
+        String trustStoreType = ConfigUtils
+            .getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_TYPE,
+                ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_TYPE_DEFAULT);
+        String trustStoreFilePassword = passwordManager.readPassword(ConfigUtils
+            .getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_PASSWORD, ""));
+        String cacertsFilepath = ConfigUtils
+            .getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_LOCATION, "");
+        String truststoreAbsolutePath = Paths.get(cacertsFilepath).toAbsolutePath().normalize().toString();
+        log.info("Truststore absolutePath is:" + truststoreAbsolutePath);
+
+
+        this.lowLevelClient =
+            buildRestClient(this.hostAddresses, threadCount, true, keyStoreType, keyStoreFilePassword, identityFilepath,
+                trustStoreType, trustStoreFilePassword, cacertsFilepath);
+      }
+      else {
+        this.lowLevelClient = buildRestClient(this.hostAddresses, threadCount);
+      }
+      client = new RestHighLevelClient(this.lowLevelClient);
+
+      log.info("Elasticsearch Rest Writer configured successfully with: indexName={}, "
+              + "indexType={}, idMappingEnabled={}, typeMapperClassName={}, ssl={}",
+          this.indexName, this.indexType, this.idMappingEnabled, this.typeMapper.getClass().getCanonicalName(),
+          sslEnabled);
+
+    } catch (Exception e) {
+      throw new IOException("Failed to instantiate rest elasticsearch client", e);
+    }
+  }
+
+  @Override
+  int getDefaultPort() {
+    return ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_REST_WRITER_DEFAULT_PORT;
+  }
+
+
+  private static RestClient buildRestClient(List<InetSocketTransportAddress> hosts, int threadCount)
+      throws Exception {
+    return buildRestClient(hosts, threadCount, false, null, null, null, null, null, null);
+  }
+
+
+  //TODO: Support pass through of configuration (e.g. timeouts etc) of rest client from above
+  private static RestClient buildRestClient(List<InetSocketTransportAddress> hosts, int threadCount, boolean sslEnabled,
+      String keyStoreType, String keyStoreFilePassword, String identityFilepath, String trustStoreType,
+      String trustStoreFilePassword, String cacertsFilepath) throws Exception {
+
+
+    HttpHost[] httpHosts = new HttpHost[hosts.size()];
+    String scheme = sslEnabled?"https":"http";
+    for (int h = 0; h < httpHosts.length; h++) {
+      InetSocketTransportAddress host = hosts.get(h);
+      httpHosts[h] = new HttpHost(host.getAddress(), host.getPort(), scheme);
+    }
+
+    RestClientBuilder builder = RestClient.builder(httpHosts);
+
+    if (sslEnabled) {
+      log.info("ssl configuration: trustStoreType = {}, cacertsFilePath = {}", trustStoreType, cacertsFilepath);
+      KeyStore truststore = KeyStore.getInstance(trustStoreType);
+      FileInputStream trustInputStream = new FileInputStream(cacertsFilepath);
+      try {
+        truststore.load(trustInputStream, trustStoreFilePassword.toCharArray());
+      }
+      finally {
+        trustInputStream.close();
+      }
+      SSLContextBuilder sslBuilder = SSLContexts.custom().loadTrustMaterial(truststore, null);
+
+      log.info("ssl key configuration: keyStoreType = {}, keyFilePath = {}", keyStoreType, identityFilepath);
+
+      KeyStore keystore = KeyStore.getInstance(keyStoreType);
+      FileInputStream keyInputStream = new FileInputStream(identityFilepath);
+      try {
+        keystore.load(keyInputStream, keyStoreFilePassword.toCharArray());
+      }
+      finally {
+        keyInputStream.close();
+      }
+      sslBuilder.loadKeyMaterial(keystore, keyStoreFilePassword.toCharArray());
+
+      final SSLContext sslContext = sslBuilder.build();
+      builder = builder.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder
+          // Set ssl context
+          .setSSLContext(sslContext).setSSLHostnameVerifier(new NoopHostnameVerifier())
+          // Configure number of threads for clients
+          .setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(threadCount).build()));
+    } else {
+      builder = builder.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder
+          // Configure number of threads for clients
+          .setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(threadCount).build()));
+    }
+
+    // Configure timeouts
+    builder.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder
+        .setConnectionRequestTimeout(0)); // Important, otherwise the client has spurious timeouts
+
+    return builder.build();
+  }
+
+  @Override
+  public Future<WriteResponse> write(final Batch<Object> batch, @Nullable WriteCallback callback) {
+
+    Pair<BulkRequest, FutureCallbackHolder> preparedBatch = this.prepareBatch(batch, callback);
+    try {
+      client.bulkAsync(preparedBatch.getFirst(), preparedBatch.getSecond().getActionListener());
+      return preparedBatch.getSecond().getFuture();
+    }
+    catch (Exception e) {
+      throw new RuntimeException("Caught unexpected exception while calling bulkAsync API", e);
+    }
+  }
+
+
+
+  @Override
+  public void flush() throws IOException {
+
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    this.lowLevelClient.close();
+  }
+
+  @VisibleForTesting
+  public RestHighLevelClient getRestHighLevelClient() {
+    return this.client;
+  }
+
+  @VisibleForTesting
+  public RestClient getRestLowLevelClient() {
+    return this.lowLevelClient;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchTransportClientWriter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchTransportClientWriter.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchTransportClientWriter.java
new file mode 100644
index 0000000..bb26fb5
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchTransportClientWriter.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.writer;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.math3.util.Pair;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.writer.Batch;
+import org.apache.gobblin.writer.BatchAsyncDataWriter;
+import org.apache.gobblin.writer.GenericWriteResponseWrapper;
+import org.apache.gobblin.writer.WriteCallback;
+import org.apache.gobblin.writer.WriteResponse;
+import org.apache.gobblin.writer.WriteResponseFuture;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ListenableActionFuture;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.transport.client.PreBuiltTransportClient;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+class ElasticsearchTransportClientWriter extends ElasticsearchWriterBase implements BatchAsyncDataWriter<Object> {
+
+  private final TransportClient client;
+
+  ElasticsearchTransportClientWriter(Config config) throws UnknownHostException {
+    super(config);
+    // Check if ssl is being configured, throw error that transport client does not support ssl
+    Preconditions.checkArgument(!ConfigUtils.getBoolean(config,
+        ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_ENABLED, false),
+        "Transport client does not support ssl, try the Rest client instead");
+
+    this.client = createTransportClient(config);
+
+    log.info("ElasticsearchWriter configured successfully with: indexName={}, indexType={}, idMappingEnabled={}, typeMapperClassName={}",
+        this.indexName, this.indexType, this.idMappingEnabled, this.typeMapper);
+  }
+
+  @Override
+  int getDefaultPort() {
+    return ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_TRANSPORT_WRITER_DEFAULT_PORT;
+  }
+
+  @Override
+  public Future<WriteResponse> write(Batch<Object> batch, @Nullable WriteCallback callback) {
+
+    Pair<BulkRequest, FutureCallbackHolder> preparedBatch = this.prepareBatch(batch, callback);
+    client.bulk(preparedBatch.getFirst(), preparedBatch.getSecond().getActionListener());
+    return preparedBatch.getSecond().getFuture();
+
+  }
+
+  @Override
+  public void flush() throws IOException {
+    // Elasticsearch client doesn't support a flush method
+  }
+
+  @Override
+  public void close() throws IOException {
+    log.info("Got a close call in ElasticSearchTransportWriter");
+    super.close();
+    this.client.close();
+  }
+
+  @VisibleForTesting
+  TransportClient getTransportClient() {
+    return this.client;
+  }
+
+  private TransportClient createTransportClient(Config config) throws UnknownHostException {
+    TransportClient transportClient;
+
+    // Set TransportClient settings
+    Settings.Builder settingsBuilder = Settings.builder();
+    if (config.hasPath(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SETTINGS)) {
+      settingsBuilder.put(ConfigUtils.configToProperties(config,
+              ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SETTINGS));
+    }
+    settingsBuilder.put("client.transport.ignore_cluster_name",true);
+    settingsBuilder.put("client.transport.sniff", true);
+    transportClient = new PreBuiltTransportClient(settingsBuilder.build());
+    this.hostAddresses.forEach(transportClient::addTransportAddress);
+    return transportClient;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterBase.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterBase.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterBase.java
new file mode 100644
index 0000000..5238b50
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterBase.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.writer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.commons.math3.util.Pair;
+import org.apache.gobblin.elasticsearch.typemapping.JsonSerializer;
+import org.apache.gobblin.elasticsearch.typemapping.TypeMapper;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.writer.Batch;
+import org.apache.gobblin.writer.WriteCallback;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Throwables;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * A base class for different types of Elasticsearch writers
+ */
+@Slf4j
+public abstract class ElasticsearchWriterBase implements Closeable {
+  protected final String indexName;
+  protected final String indexType;
+  protected final TypeMapper typeMapper;
+  protected final JsonSerializer serializer;
+  protected final boolean idMappingEnabled;
+  protected final String idFieldName;
+  List<InetSocketTransportAddress> hostAddresses;
+  protected final MalformedDocPolicy malformedDocPolicy;
+
+  ElasticsearchWriterBase(Config config)
+      throws UnknownHostException {
+
+    this.indexName = config.getString(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_NAME);
+    Preconditions.checkNotNull(this.indexName, "Index Name not provided. Please set "
+        + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_NAME);
+    Preconditions.checkArgument(this.indexName.equals(this.indexName.toLowerCase()),
+        "Index name must be lowercase, you provided " + this.indexName);
+    this.indexType = config.getString(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_TYPE);
+    Preconditions.checkNotNull(this.indexName, "Index Type not provided. Please set "
+        + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_TYPE);
+    this.idMappingEnabled = ConfigUtils.getBoolean(config,
+        ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_ID_MAPPING_ENABLED,
+        ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_ID_MAPPING_DEFAULT);
+    this.idFieldName = ConfigUtils.getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_ID_FIELD,
+        ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_ID_FIELD_DEFAULT);
+    String typeMapperClassName = ConfigUtils.getString(config,
+        ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS,
+        ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS_DEFAULT);
+    if (typeMapperClassName.isEmpty()) {
+      throw new IllegalArgumentException(this.getClass().getCanonicalName() + " needs to be configured with "
+          + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS + " to enable type mapping");
+    }
+    try {
+      Class<?> typeMapperClass = (Class<?>) Class.forName(typeMapperClassName);
+
+      this.typeMapper = (TypeMapper) ConstructorUtils.invokeConstructor(typeMapperClass);
+      this.typeMapper.configure(config);
+      this.serializer = this.typeMapper.getSerializer();
+    } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) {
+      log.error("Failed to instantiate type-mapper from class " + typeMapperClassName, e);
+      throw Throwables.propagate(e);
+    }
+
+    this.malformedDocPolicy = MalformedDocPolicy.valueOf(ConfigUtils.getString(config,
+        ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_MALFORMED_DOC_POLICY,
+        ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_MALFORMED_DOC_POLICY_DEFAULT));
+
+    // If list is empty, connect to the default host and port
+    if (!config.hasPath(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_HOSTS)) {
+      InetSocketTransportAddress hostAddress = new InetSocketTransportAddress(
+          InetAddress.getByName(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_DEFAULT_HOST),
+          getDefaultPort());
+      this.hostAddresses = new ArrayList<>(1);
+      this.hostAddresses.add(hostAddress);
+      log.info("Adding host {} to Elasticsearch writer", hostAddress);
+    } else {
+      // Get list of hosts
+      List<String> hosts = ConfigUtils.getStringList(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_HOSTS);
+      // Add host addresses
+      Splitter hostSplitter = Splitter.on(":").trimResults();
+      this.hostAddresses = new ArrayList<>(hosts.size());
+      for (String host : hosts) {
+
+        List<String> hostSplit = hostSplitter.splitToList(host);
+        Preconditions.checkArgument(hostSplit.size() == 1 || hostSplit.size() == 2,
+            "Malformed host name for Elasticsearch writer: " + host + " host names must be of form [host] or [host]:[port]");
+
+        InetAddress hostInetAddress = InetAddress.getByName(hostSplit.get(0));
+        InetSocketTransportAddress hostAddress = null;
+
+        if (hostSplit.size() == 1) {
+          hostAddress = new InetSocketTransportAddress(hostInetAddress, this.getDefaultPort());
+        } else if (hostSplit.size() == 2) {
+          hostAddress = new InetSocketTransportAddress(hostInetAddress, Integer.parseInt(hostSplit.get(1)));
+        }
+        this.hostAddresses.add(hostAddress);
+        log.info("Adding host {} to Elasticsearch writer", hostAddress);
+      }
+    }
+  }
+
+  abstract int getDefaultPort();
+
+
+  protected Pair<BulkRequest, FutureCallbackHolder> prepareBatch(Batch<Object> batch, WriteCallback callback) {
+    BulkRequest bulkRequest = new BulkRequest();
+    final StringBuilder stringBuilder = new StringBuilder();
+    for (Object record : batch.getRecords()) {
+      try {
+        byte[] serializedBytes = this.serializer.serializeToJson(record);
+        log.debug("serialized record: {}", serializedBytes);
+        IndexRequest indexRequest = new IndexRequest(this.indexName, this.indexType)
+            .source(serializedBytes, 0, serializedBytes.length, XContentType.JSON);
+        if (this.idMappingEnabled) {
+          String id = this.typeMapper.getValue(this.idFieldName, record);
+          indexRequest.id(id);
+          stringBuilder.append(";").append(id);
+        }
+        bulkRequest.add(indexRequest);
+      }
+      catch (Exception e) {
+        log.error("Encountered exception {}", e);
+      }
+    }
+    FutureCallbackHolder futureCallbackHolder = new FutureCallbackHolder(callback,
+        exception -> log.error("Batch: {} failed on ids; {} with exception {}", batch.getId(),
+            stringBuilder.toString(), exception),
+        this.malformedDocPolicy);
+    return new Pair(bulkRequest, futureCallbackHolder);
+  }
+
+  @Override
+  public void close() throws IOException {
+    this.serializer.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterConfigurationKeys.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterConfigurationKeys.java
new file mode 100644
index 0000000..0dad29d
--- /dev/null
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterConfigurationKeys.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.writer;
+
+import org.apache.gobblin.elasticsearch.typemapping.JsonTypeMapper;
+
+
+public class ElasticsearchWriterConfigurationKeys {
+
+  private static final String ELASTICSEARCH_WRITER_PREFIX = "writer.elasticsearch";
+
+  private static String prefix(String value) { return ELASTICSEARCH_WRITER_PREFIX + "." + value;};
+
+  public static final String ELASTICSEARCH_WRITER_SETTINGS = prefix("settings");
+  public static final String ELASTICSEARCH_WRITER_HOSTS = prefix("hosts");
+  public static final String ELASTICSEARCH_WRITER_INDEX_NAME = prefix("index.name");
+  public static final String ELASTICSEARCH_WRITER_INDEX_TYPE = prefix("index.type");
+  public static final String ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS = prefix("typeMapperClass");
+  public static final String ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS_DEFAULT = JsonTypeMapper.class.getCanonicalName();
+  public static final String ELASTICSEARCH_WRITER_ID_MAPPING_ENABLED = prefix("useIdFromData");
+  public static final Boolean ELASTICSEARCH_WRITER_ID_MAPPING_DEFAULT = false;
+  public static final String ELASTICSEARCH_WRITER_ID_FIELD = prefix("idFieldName");
+  public static final String ELASTICSEARCH_WRITER_ID_FIELD_DEFAULT = "id";
+  public static final String ELASTICSEARCH_WRITER_CLIENT_TYPE = prefix("client.type");
+  public static final String ELASTICSEARCH_WRITER_CLIENT_TYPE_DEFAULT = "REST";
+  public static final String ELASTICSEARCH_WRITER_CLIENT_THREADPOOL_SIZE = prefix("client.threadPoolSize");
+  public static final int ELASTICSEARCH_WRITER_CLIENT_THREADPOOL_DEFAULT = 5;
+  public static final String ELASTICSEARCH_WRITER_SSL_ENABLED=prefix("ssl.enabled");
+  public static final boolean ELASTICSEARCH_WRITER_SSL_ENABLED_DEFAULT=false;
+  public static final String ELASTICSEARCH_WRITER_SSL_KEYSTORE_TYPE=prefix("ssl.keystoreType");
+  public static final String ELASTICSEARCH_WRITER_SSL_KEYSTORE_TYPE_DEFAULT = "pkcs12";
+  public static final String ELASTICSEARCH_WRITER_SSL_KEYSTORE_PASSWORD=prefix("ssl.keystorePassword");
+  public static final String ELASTICSEARCH_WRITER_SSL_KEYSTORE_LOCATION=prefix("ssl.keystoreLocation");
+  public static final String ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_TYPE=prefix("ssl.truststoreType");
+  public static final String ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_TYPE_DEFAULT = "jks";
+  public static final String ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_LOCATION=prefix("ssl.truststoreLocation");
+  public static final String ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_PASSWORD=prefix("ssl.truststorePassword");
+  public static final String ELASTICSEARCH_WRITER_MALFORMED_DOC_POLICY = prefix("malformedDocPolicy");
+  public static final String ELASTICSEARCH_WRITER_MALFORMED_DOC_POLICY_DEFAULT = "FAIL";
+
+  //Async Writer Configuration
+  public static final String RETRIES_ENABLED = prefix("retriesEnabled");
+  public static final boolean RETRIES_ENABLED_DEFAULT = true;
+  public static final String MAX_RETRIES = prefix("maxRetries");
+  public static final int MAX_RETRIES_DEFAULT = 5;
+  static final String FAILURE_ALLOWANCE_PCT_CONFIG = prefix("failureAllowancePercentage");
+  static final double FAILURE_ALLOWANCE_PCT_DEFAULT = 0.0;
+
+  public enum ClientType {
+    TRANSPORT,
+    REST
+  }
+
+  public static final String ELASTICSEARCH_WRITER_DEFAULT_HOST = "localhost";
+  public static final int ELASTICSEARCH_TRANSPORT_WRITER_DEFAULT_PORT = 9300;
+  public static final int ELASTICSEARCH_REST_WRITER_DEFAULT_PORT = 9200;
+}