You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2019/11/13 01:52:53 UTC

[flume] branch trunk updated: FLUME-3345 Add Kudu Flume Sinks

This is an automated email from the ASF dual-hosted git repository.

mpercy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/flume.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9dafe98  FLUME-3345 Add Kudu Flume Sinks
9dafe98 is described below

commit 9dafe98972a652aa1a202c6df9c08139d2bea592
Author: Grant Henke <gh...@cloudera.com>
AuthorDate: Wed Sep 18 16:43:57 2019 -0500

    FLUME-3345 Add Kudu Flume Sinks
    
    This patch adds the Flume sinks that were previously maintained
    in the Apache Kudu project.
---
 .../config/checkstyle/checkstyle-suppressions.xml  |   2 +-
 flume-ng-sinks/flume-ng-kudu-sink/pom.xml          | 192 ++++++++++
 .../sink/kudu/AvroKuduOperationsProducer.java      | 282 +++++++++++++++
 .../flume/sink/kudu/KuduOperationsProducer.java    |  54 +++
 .../java/org/apache/flume/sink/kudu/KuduSink.java  | 315 ++++++++++++++++
 .../sink/kudu/KuduSinkConfigurationConstants.java  |  77 ++++
 .../sink/kudu/RegexpKuduOperationsProducer.java    | 398 +++++++++++++++++++++
 .../kudu/SimpleKeyedKuduOperationsProducer.java    | 136 +++++++
 .../sink/kudu/SimpleKuduOperationsProducer.java    |  92 +++++
 .../apache/flume/sink/kudu/KuduSinkTestUtil.java   |  99 +++++
 .../sink/kudu/TestAvroKuduOperationsProducer.java  | 183 ++++++++++
 .../sink/kudu/TestKeyedKuduOperationsProducer.java | 189 ++++++++++
 .../org/apache/flume/sink/kudu/TestKuduSink.java   | 213 +++++++++++
 .../kudu/TestRegexpKuduOperationsProducer.java     | 184 ++++++++++
 ...TestRegexpKuduOperationsProducerParseError.java | 291 +++++++++++++++
 .../apache/flume/sink/kudu/TestSecureKuduSink.java | 126 +++++++
 .../src/test/resources/log4j2.xml                  |  32 ++
 .../resources/testAvroKuduOperationsProducer.avsc  |  11 +
 flume-ng-sinks/pom.xml                             |   1 +
 pom.xml                                            |   5 +-
 20 files changed, 2879 insertions(+), 3 deletions(-)

diff --git a/build-support/src/main/resources/config/checkstyle/checkstyle-suppressions.xml b/build-support/src/main/resources/config/checkstyle/checkstyle-suppressions.xml
index 5981468..0636775 100644
--- a/build-support/src/main/resources/config/checkstyle/checkstyle-suppressions.xml
+++ b/build-support/src/main/resources/config/checkstyle/checkstyle-suppressions.xml
@@ -29,7 +29,7 @@ under the License.
 
     <!-- Suppress all style checks for generated code -->
     <suppress checks=".*"
-              files="generated-sources|com[/\\]cloudera[/\\]flume[/\\]handlers[/\\]thrift|org[/\\]apache[/\\]flume[/\\]thrift[/\\]|org[/\\]apache[/\\]flume[/\\]source[/\\]scribe|ProtosFactory.java"/>
+              files="generated-sources|com[/\\]cloudera[/\\]flume[/\\]handlers[/\\]thrift|org[/\\]apache[/\\]flume[/\\]thrift[/\\]|org[/\\]apache[/\\]flume[/\\]source[/\\]scribe|ProtosFactory.java|avro"/>
 
     <!-- The "legacy" sources have a weird camelCaps package name -->
     <suppress checks="PackageName"
diff --git a/flume-ng-sinks/flume-ng-kudu-sink/pom.xml b/flume-ng-sinks/flume-ng-kudu-sink/pom.xml
new file mode 100644
index 0000000..ee0665d
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kudu-sink/pom.xml
@@ -0,0 +1,192 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+  license agreements. See the NOTICE file distributed with this work for additional
+  information regarding copyright ownership. The ASF licenses this file to
+  You under the Apache License, Version 2.0 (the "License"); you may not use
+  this file except in compliance with the License. You may obtain a copy of
+  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+  by applicable law or agreed to in writing, software distributed under the
+  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+  OF ANY KIND, either express or implied. See the License for the specific
+  language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>flume-ng-sinks</artifactId>
+    <groupId>org.apache.flume</groupId>
+    <version>1.10.0-SNAPSHOT</version>
+  </parent>
+  <groupId>org.apache.flume.flume-ng-sinks</groupId>
+  <artifactId>flume-ng-kudu-sink</artifactId>
+  <name>Flume Kudu Sink</name>
+
+  <properties>
+    <exclude.tests>None</exclude.tests>
+  </properties>
+
+  <build>
+    <extensions>
+      <!-- Used to find the right kudu-binary artifact with the Maven
+           property ${os.detected.classifier} -->
+      <extension>
+        <groupId>kr.motd.maven</groupId>
+        <artifactId>os-maven-plugin</artifactId>
+        <version>1.6.2</version>
+      </extension>
+    </extensions>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.avro</groupId>
+        <artifactId>avro-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>schema</goal>
+            </goals>
+            <configuration>
+              <testSourceDirectory>${basedir}/src/test/resources</testSourceDirectory>
+              <testIncludes>
+                <testInclude>**/*.avsc</testInclude>
+              </testIncludes>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <excludes>
+            <exclude>${exclude.tests}</exclude>
+          </excludes>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-sdk</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-configuration</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-slf4j-impl</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-1.2-api</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.kudu</groupId>
+      <artifactId>kudu-client</artifactId>
+      <version>${kudu.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.kudu</groupId>
+      <artifactId>kudu-test-utils</artifactId>
+      <version>${kudu.version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <profiles>
+    <profile>
+      <id>kudu-windows</id>
+      <activation>
+        <os>
+          <family>Windows</family>
+        </os>
+      </activation>
+      <properties>
+        <!-- Kudu tests do not support Windows. -->
+        <exclude.tests>**/*.java</exclude.tests>
+      </properties>
+    </profile>
+    <profile>
+      <id>kudu-linux</id>
+      <activation>
+        <os>
+          <family>Unix</family>
+        </os>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.kudu</groupId>
+          <artifactId>kudu-binary</artifactId>
+          <version>${kudu.version}</version>
+          <classifier>${os.detected.classifier}</classifier>
+          <scope>test</scope>
+        </dependency>
+      </dependencies>
+    </profile>
+    <profile>
+      <id>kudu-mac</id>
+      <activation>
+        <os>
+          <family>mac</family>
+        </os>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.kudu</groupId>
+          <artifactId>kudu-binary</artifactId>
+          <version>${kudu.version}</version>
+          <classifier>${os.detected.classifier}</classifier>
+          <scope>test</scope>
+        </dependency>
+      </dependencies>
+    </profile>
+  </profiles>
+</project>
diff --git a/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/AvroKuduOperationsProducer.java b/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/AvroKuduOperationsProducer.java
new file mode 100644
index 0000000..bdc3b79
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/AvroKuduOperationsProducer.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.sink.kudu;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.PartialRow;
+
+/**
+ * An Avro serializer that generates one operation per event by deserializing the event
+ * body as an Avro record and mapping its fields to columns in a Kudu table.
+ *
+ * <p><strong>Avro Kudu Operations Producer configuration parameters</strong>
+ * <table cellpadding=3 cellspacing=0 border=1
+ *        summary="Avro Kudu Operations Producer configuration parameters">
+ * <tr><th>Property Name</th>
+ *   <th>Default</th>
+ *   <th>Required?</th>
+ *   <th>Description</th></tr>
+ * <tr>
+ *   <td>producer.operation</td>
+ *   <td>upsert</td>
+ *   <td>No</td>
+ *   <td>The operation used to write events to Kudu.
+ *   Supported operations are 'insert' and 'upsert'</td>
+ * </tr>
+ * <tr>
+ *   <td>producer.schemaPath</td>
+ *   <td></td>
+ *   <td>No</td>
+ *   <td>The location of the Avro schema file used to deserialize the Avro-encoded event bodies.
+ *   It's used whenever an event does not include its own schema. If not specified, the
+ *   schema must be specified on a per-event basis, either by url or as a literal.
+ *   Schemas must be record type.</td>
+ * </tr>
+ * </table>
+ */
+public class AvroKuduOperationsProducer implements KuduOperationsProducer {
+  public static final String OPERATION_PROP = "operation";
+  public static final String SCHEMA_PROP = "schemaPath";
+  public static final String DEFAULT_OPERATION = "upsert";
+  public static final String SCHEMA_URL_HEADER = "flume.avro.schema.url";
+  public static final String SCHEMA_LITERAL_HEADER = "flume.avro.schema.literal";
+
+  private String operation = "";
+  private GenericRecord reuse;
+  private KuduTable table;
+  private String defaultSchemaUrl;
+
+  /**
+   * The binary decoder to reuse for event parsing.
+   */
+  private BinaryDecoder decoder = null;
+
+  /**
+   * A cache of schemas retrieved by URL to avoid re-parsing the schema.
+   */
+  private static final LoadingCache<String, Schema> schemasFromURL =
+      CacheBuilder.newBuilder()
+          .build(new CacheLoader<String, Schema>() {
+            @Override
+            public Schema load(String url) throws IOException {
+              Schema.Parser parser = new Schema.Parser();
+              InputStream is = null;
+              try {
+                FileSystem fs = FileSystem.get(URI.create(url), conf);
+                if (url.toLowerCase(Locale.ENGLISH).startsWith("hdfs:/")) {
+                  is = fs.open(new Path(url));
+                } else {
+                  is = new URL(url).openStream();
+                }
+                return parser.parse(is);
+              } finally {
+                if (is != null) {
+                  is.close();
+                }
+              }
+            }
+          });
+
+  /**
+   * A cache of literal schemas to avoid re-parsing the schema.
+   */
+  private static final LoadingCache<String, Schema> schemasFromLiteral =
+      CacheBuilder.newBuilder()
+          .build(new CacheLoader<String, Schema>() {
+            @Override
+            public Schema load(String literal) {
+              Preconditions.checkNotNull(literal,
+                  "Schema literal cannot be null without a Schema URL");
+              return new Schema.Parser().parse(literal);
+            }
+          });
+
+  /**
+   * A cache of DatumReaders per schema.
+   */
+  private static final LoadingCache<Schema, DatumReader<GenericRecord>> readers =
+      CacheBuilder.newBuilder()
+          .build(new CacheLoader<Schema, DatumReader<GenericRecord>>() {
+            @Override
+            public DatumReader<GenericRecord> load(Schema schema) {
+              return new GenericDatumReader<>(schema);
+            }
+          });
+
+  private static final Configuration conf = new Configuration();
+
+  public AvroKuduOperationsProducer() {
+  }
+
+  @Override
+  public void configure(Context context) {
+    this.operation = context.getString(OPERATION_PROP, DEFAULT_OPERATION);
+
+    String schemaPath = context.getString(SCHEMA_PROP);
+    if (schemaPath != null) {
+      defaultSchemaUrl = schemaPath;
+    }
+  }
+
+  @Override
+  public void initialize(KuduTable table) {
+    this.table = table;
+  }
+
+  @Override
+  public List<Operation> getOperations(Event event) throws FlumeException {
+    Schema schema = getSchema(event);
+    DatumReader<GenericRecord> reader = readers.getUnchecked(schema);
+    decoder = DecoderFactory.get().binaryDecoder(event.getBody(), decoder);
+    try {
+      reuse = reader.read(reuse, decoder);
+    } catch (IOException e) {
+      throw new FlumeException("Cannot deserialize event", e);
+    }
+    Operation op;
+    switch (operation.toLowerCase(Locale.ENGLISH)) {
+      case "upsert":
+        op = table.newUpsert();
+        break;
+      case "insert":
+        op = table.newInsert();
+        break;
+      default:
+        throw new FlumeException(String.format("Unexpected operation %s", operation));
+    }
+    setupOp(op, reuse);
+    return Collections.singletonList(op);
+  }
+
+  private void setupOp(Operation op, GenericRecord record) {
+    PartialRow row = op.getRow();
+    for (ColumnSchema col : table.getSchema().getColumns()) {
+      String name = col.getName();
+      Object value = record.get(name);
+      if (value == null) {
+        // Set null if nullable, otherwise leave unset for possible Kudu default.
+        if (col.isNullable()) {
+          row.setNull(name);
+        }
+      } else {
+        // Avro doesn't support 8- or 16-bit integer types, but we'll allow them to be passed as
+        // a larger type.
+        try {
+          switch (col.getType()) {
+            case BOOL:
+              row.addBoolean(name, (boolean) value);
+              break;
+            case INT8:
+              row.addByte(name, (byte) value);
+              break;
+            case INT16:
+              row.addShort(name, (short) value);
+              break;
+            case INT32:
+              row.addInt(name, (int) value);
+              break;
+            case INT64: // Fall through
+            case UNIXTIME_MICROS:
+              row.addLong(name, (long) value);
+              break;
+            case FLOAT:
+              row.addFloat(name, (float) value);
+              break;
+            case DOUBLE:
+              row.addDouble(name, (double) value);
+              break;
+            case STRING:
+              row.addString(name, value.toString());
+              break;
+            case BINARY:
+              row.addBinary(name, (byte[]) value);
+              break;
+            default:
+              throw new FlumeException(String.format(
+                  "Unrecognized type %s for column %s", col.getType().toString(), name));
+          }
+        } catch (ClassCastException e) {
+          throw new FlumeException(
+              String.format("Failed to coerce value for column '%s' to type %s",
+              col.getName(),
+              col.getType()), e);
+        }
+      }
+    }
+  }
+
+  private Schema getSchema(Event event) throws FlumeException {
+    Map<String, String> headers = event.getHeaders();
+    String schemaUrl = headers.get(SCHEMA_URL_HEADER);
+    String schemaLiteral = headers.get(SCHEMA_LITERAL_HEADER);
+    try {
+      if (schemaUrl != null) {
+        return schemasFromURL.get(schemaUrl);
+      } else if (schemaLiteral != null) {
+        return schemasFromLiteral.get(schemaLiteral);
+      } else if (defaultSchemaUrl != null) {
+        return schemasFromURL.get(defaultSchemaUrl);
+      } else {
+        throw new FlumeException(
+            String.format("No schema for event. " +
+                "Specify configuration property '%s' or event header '%s'",
+                SCHEMA_PROP,
+                SCHEMA_URL_HEADER));
+      }
+    } catch (ExecutionException e) {
+      throw new FlumeException("Cannot get schema", e);
+    } catch (RuntimeException e) {
+      throw new FlumeException("Cannot parse schema", e);
+    }
+  }
+
+  @Override
+  public void close() {
+  }
+}
diff --git a/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/KuduOperationsProducer.java b/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/KuduOperationsProducer.java
new file mode 100644
index 0000000..84ef34f
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/KuduOperationsProducer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.sink.kudu;
+
+import java.util.List;
+
+import org.apache.flume.Event;
+import org.apache.flume.conf.Configurable;
+
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+
+/**
+ * Interface for an operations producer that produces Kudu Operations from
+ * Flume events.
+ */
+public interface KuduOperationsProducer extends Configurable, AutoCloseable {
+  /**
+   * Initializes the operations producer. Called between configure and
+   * getOperations.
+   * @param table the KuduTable used to create Kudu Operation objects
+   */
+  void initialize(KuduTable table);
+
+  /**
+   * Returns the operations that should be written to Kudu as a result of this event.
+   * @param event Event to convert to one or more Operations
+   * @return List of Operations that should be written to Kudu
+   */
+  List<Operation> getOperations(Event event);
+
+  /**
+   * Cleans up any state. Called when the sink is stopped.
+   */
+  @Override
+  void close();
+}
diff --git a/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/KuduSink.java b/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/KuduSink.java
new file mode 100644
index 0000000..d5e15f8
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/KuduSink.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.sink.kudu;
+
+import java.lang.reflect.InvocationTargetException;
+import java.security.PrivilegedAction;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.Transaction;
+import org.apache.flume.auth.FlumeAuthenticationUtil;
+import org.apache.flume.auth.PrivilegedExecutor;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.instrumentation.SinkCounter;
+import org.apache.flume.sink.AbstractSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.client.AsyncKuduClient;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.OperationResponse;
+import org.apache.kudu.client.SessionConfiguration;
+
+/**
+ * A Flume sink that reads events from a channel and writes them to a Kudu table.
+ *
+ * <p><strong>Flume Kudu Sink configuration parameters</strong>
+ *
+ * <table cellpadding=3 cellspacing=0 border=1 summary="Flume Kudu Sink configuration parameters">
+ * <tr><th>Property Name</th><th>Default</th><th>Required?</th><th>Description</th></tr>
+ * <tr><td>channel</td><td></td><td>Yes</td><td>The name of the Flume channel to read.</td></tr>
+ * <tr><td>type</td><td></td><td>Yes</td>
+ *     <td>Component name. Must be {@code org.apache.kudu.flume.sink.KuduSink}</td></tr>
+ * <tr><td>masterAddresses</td><td></td><td>Yes</td>
+ *     <td>Comma-separated list of "host:port" Kudu master addresses.
+ *     The port is optional.</td></tr>
+ * <tr><td>tableName</td><td></td><td>Yes</td>
+ *     <td>The name of the Kudu table to write to.</td></tr>
+ * <tr><td>batchSize</td><td>1000</td><td>No</td>
+ * <td>The maximum number of events the sink takes from the channel per transaction.</td></tr>
+ * <tr><td>ignoreDuplicateRows</td><td>true</td>
+ *     <td>No</td><td>Whether to ignore duplicate primary key errors caused by inserts.</td></tr>
+ * <tr><td>timeoutMillis</td><td>10000</td><td>No</td>
+ *     <td>Timeout period for Kudu write operations, in milliseconds.</td></tr>
+ * <tr><td>producer</td><td>{@link SimpleKuduOperationsProducer}</td><td>No</td>
+ *     <td>The fully-qualified class name of the {@link KuduOperationsProducer}
+ *     the sink should use.</td></tr>
+ * <tr><td>producer.*</td><td></td><td>(Varies by operations producer)</td>
+ *     <td>Configuration properties to pass to the operations producer implementation.</td></tr>
+ * </table>
+ *
+ * <p><strong>Installation</strong>
+ *
+ * <p>After building the sink, in order to use it with Flume, place the file named
+ * <tt>kudu-flume-sink-<em>VERSION</em>-jar-with-dependencies.jar</tt> in the
+ * Flume <tt>plugins.d</tt> directory under <tt>kudu-flume-sink/lib/</tt>.
+ *
+ * <p>For detailed instructions on using Flume's plugins.d mechanism, please see the plugins.d
+ * section of the <a href="https://flume.apache.org/FlumeUserGuide.html#the-plugins-d-directory">
+ *   Flume User Guide</a>.
+ */
+public class KuduSink extends AbstractSink implements Configurable {
+  private static final Logger logger = LoggerFactory.getLogger(KuduSink.class);
+  private static final int DEFAULT_BATCH_SIZE = 1000;
+  private static final Long DEFAULT_TIMEOUT_MILLIS =
+          AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS;
+  private static final String DEFAULT_KUDU_OPERATION_PRODUCER =
+          "org.apache.flume.sink.kudu.SimpleKuduOperationsProducer";
+  private static final boolean DEFAULT_IGNORE_DUPLICATE_ROWS = true;
+
+  private String masterAddresses;
+  private String tableName;
+  private int batchSize;
+  private long timeoutMillis;
+  private boolean ignoreDuplicateRows;
+  private KuduTable table;
+  private KuduSession session;
+  private KuduClient client;
+  private KuduOperationsProducer operationsProducer;
+  private SinkCounter sinkCounter;
+  private PrivilegedExecutor privilegedExecutor;
+
+  public KuduSink() {
+    this(null);
+  }
+
+  public KuduSink(KuduClient kuduClient) {
+    this.client = kuduClient;
+  }
+
+  @Override
+  public synchronized void start() {
+    Preconditions.checkState(table == null && session == null,
+        "Please call stop before calling start on an old instance.");
+
+    // Client is not null only inside tests.
+    if (client == null) {
+      // Creating client with FlumeAuthenticator.
+      client = privilegedExecutor.execute(
+        new PrivilegedAction<KuduClient>() {
+          @Override
+          public KuduClient run() {
+            return new KuduClient.KuduClientBuilder(masterAddresses).build();
+          }
+        }
+      );
+    }
+    session = client.newSession();
+    session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
+    session.setTimeoutMillis(timeoutMillis);
+    session.setIgnoreAllDuplicateRows(ignoreDuplicateRows);
+    session.setMutationBufferSpace(batchSize);
+
+    try {
+      table = client.openTable(tableName);
+    } catch (Exception ex) {
+      sinkCounter.incrementConnectionFailedCount();
+      String msg = String.format("Could not open Kudu table '%s'", tableName);
+      logger.error(msg, ex);
+      throw new FlumeException(msg, ex);
+    }
+    operationsProducer.initialize(table);
+
+    super.start();
+    sinkCounter.incrementConnectionCreatedCount();
+    sinkCounter.start();
+  }
+
+  @Override
+  public synchronized void stop() {
+    Exception ex = null;
+    try {
+      operationsProducer.close();
+    } catch (Exception e) {
+      ex = e;
+      logger.error("Error closing operations producer", e);
+    }
+    try {
+      if (client != null) {
+        client.shutdown();
+      }
+    } catch (Exception e) {
+      ex = e;
+      logger.error("Error closing client", e);
+    }
+    client = null;
+    table = null;
+    session = null;
+
+    sinkCounter.incrementConnectionClosedCount();
+    sinkCounter.stop();
+    if (ex != null) {
+      throw new FlumeException("Error stopping sink", ex);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public synchronized void configure(Context context) {
+    masterAddresses = context.getString(KuduSinkConfigurationConstants.MASTER_ADDRESSES);
+    Preconditions.checkNotNull(masterAddresses,
+        "Missing master addresses. Please specify property '%s'.",
+        KuduSinkConfigurationConstants.MASTER_ADDRESSES);
+
+    tableName = context.getString(KuduSinkConfigurationConstants.TABLE_NAME);
+    Preconditions.checkNotNull(tableName,
+        "Missing table name. Please specify property '%s'",
+        KuduSinkConfigurationConstants.TABLE_NAME);
+
+    batchSize = context.getInteger(KuduSinkConfigurationConstants.BATCH_SIZE, DEFAULT_BATCH_SIZE);
+    timeoutMillis = context.getLong(KuduSinkConfigurationConstants.TIMEOUT_MILLIS,
+                                    DEFAULT_TIMEOUT_MILLIS);
+    ignoreDuplicateRows = context.getBoolean(KuduSinkConfigurationConstants.IGNORE_DUPLICATE_ROWS,
+                                             DEFAULT_IGNORE_DUPLICATE_ROWS);
+    String operationProducerType = context.getString(KuduSinkConfigurationConstants.PRODUCER);
+    String kerberosPrincipal =
+        context.getString(KuduSinkConfigurationConstants.KERBEROS_PRINCIPAL);
+    String kerberosKeytab = context.getString(KuduSinkConfigurationConstants.KERBEROS_KEYTAB);
+    String proxyUser = context.getString(KuduSinkConfigurationConstants.PROXY_USER);
+
+    privilegedExecutor = FlumeAuthenticationUtil.getAuthenticator(
+        kerberosPrincipal, kerberosKeytab).proxyAs(proxyUser);
+
+    // Check for operations producer, if null set default operations producer type.
+    if (operationProducerType == null || operationProducerType.isEmpty()) {
+      operationProducerType = DEFAULT_KUDU_OPERATION_PRODUCER;
+      logger.warn("No Kudu operations producer provided, using default");
+    }
+
+    Context producerContext = new Context();
+    producerContext.putAll(context.getSubProperties(
+        KuduSinkConfigurationConstants.PRODUCER_PREFIX));
+
+    try {
+      Class<? extends KuduOperationsProducer> clazz =
+          (Class<? extends KuduOperationsProducer>)
+          Class.forName(operationProducerType);
+      operationsProducer = clazz.getDeclaredConstructor().newInstance();
+      operationsProducer.configure(producerContext);
+    } catch (ClassNotFoundException | NoSuchMethodException |
+        InstantiationException | IllegalAccessException | InvocationTargetException e) {
+      logger.error("Could not instantiate Kudu operations producer" , e);
+      throw new RuntimeException(e);
+    }
+    sinkCounter = new SinkCounter(this.getName());
+  }
+
+  public synchronized KuduClient getClient() {
+    return client;
+  }
+
+  @Override
+  public synchronized Status process() throws EventDeliveryException {
+    if (session.hasPendingOperations()) {
+      // If for whatever reason we have pending operations, refuse to process
+      // more and tell the caller to try again a bit later. We don't want to
+      // pile on the KuduSession.
+      return Status.BACKOFF;
+    }
+
+    Channel channel = getChannel();
+    Transaction txn = channel.getTransaction();
+
+    txn.begin();
+
+    try {
+      long txnEventCount = 0;
+      for (; txnEventCount < batchSize; txnEventCount++) {
+        Event event = channel.take();
+        if (event == null) {
+          break;
+        }
+
+        List<Operation> operations = operationsProducer.getOperations(event);
+        for (Operation o : operations) {
+          session.apply(o);
+        }
+      }
+
+      logger.debug("Flushing {} events", txnEventCount);
+      List<OperationResponse> responses = session.flush();
+      if (responses != null) {
+        for (OperationResponse response : responses) {
+          // Throw an EventDeliveryException if at least one of the responses was
+          // a row error. Row errors can occur for example when an event is inserted
+          // into Kudu successfully but the Flume transaction is rolled back for some reason,
+          // and a subsequent replay of the same Flume transaction leads to a
+          // duplicate key error since the row already exists in Kudu.
+          // Note: Duplicate keys will not be reported as errors if ignoreDuplicateRows
+          // is enabled in the config.
+          if (response.hasRowError()) {
+            throw new EventDeliveryException("Failed to flush one or more changes. " +
+                "Transaction rolled back: " + response.getRowError().toString());
+          }
+        }
+      }
+
+      if (txnEventCount == 0) {
+        sinkCounter.incrementBatchEmptyCount();
+      } else if (txnEventCount == batchSize) {
+        sinkCounter.incrementBatchCompleteCount();
+      } else {
+        sinkCounter.incrementBatchUnderflowCount();
+      }
+
+      txn.commit();
+
+      if (txnEventCount == 0) {
+        return Status.BACKOFF;
+      }
+
+      sinkCounter.addToEventDrainSuccessCount(txnEventCount);
+      return Status.READY;
+
+    } catch (Throwable e) {
+      txn.rollback();
+
+      String msg = "Failed to commit transaction. Transaction rolled back.";
+      logger.error(msg, e);
+      if (e instanceof Error || e instanceof RuntimeException) {
+        throw new RuntimeException(e);
+      } else {
+        logger.error(msg, e);
+        throw new EventDeliveryException(msg, e);
+      }
+    } finally {
+      txn.close();
+    }
+  }
+}
diff --git a/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/KuduSinkConfigurationConstants.java b/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/KuduSinkConfigurationConstants.java
new file mode 100644
index 0000000..a39f82c
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/KuduSinkConfigurationConstants.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.sink.kudu;
+
+public class KuduSinkConfigurationConstants {
+  /**
+   * Comma-separated list of "host:port" Kudu master addresses.
+   * The port is optional and defaults to the Kudu Java client's default master
+   * port.
+   */
+  public static final String MASTER_ADDRESSES = "masterAddresses";
+
+  /**
+   * The name of the table in Kudu to write to.
+   */
+  public static final String TABLE_NAME = "tableName";
+
+  /**
+   * The fully qualified class name of the KuduOperationsProducer class that the
+   * sink should use.
+   */
+  public static final String PRODUCER = "producer";
+
+  /**
+   * Prefix for configuration parameters that are passed to the
+   * KuduOperationsProducer.
+   */
+  public static final String PRODUCER_PREFIX = PRODUCER + ".";
+
+  /**
+   * Maximum number of events that the sink should take from the channel per
+   * transaction.
+   */
+  public static final String BATCH_SIZE = "batchSize";
+
+  /**
+   * Timeout period for Kudu operations, in milliseconds.
+   */
+  public static final String TIMEOUT_MILLIS = "timeoutMillis";
+
+  /**
+   * Whether to ignore duplicate primary key errors caused by inserts.
+   */
+  public static final String IGNORE_DUPLICATE_ROWS = "ignoreDuplicateRows";
+
+  /**
+   * Path to the keytab file used for authentication
+   */
+  public static final String KERBEROS_KEYTAB = "kerberosKeytab";
+
+  /**
+   * Kerberos principal used for authentication
+   */
+  public static final String KERBEROS_PRINCIPAL = "kerberosPrincipal";
+
+  /**
+   * The effective user if different from the kerberos principal
+   */
+  public static final String PROXY_USER = "proxyUser";
+}
diff --git a/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/RegexpKuduOperationsProducer.java b/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/RegexpKuduOperationsProducer.java
new file mode 100644
index 0000000..a380f97
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/RegexpKuduOperationsProducer.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.sink.kudu;
+
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.Upsert;
+
+/**
+ * A regular expression operations producer that generates one or more Kudu
+ * {@link Insert} or {@link Upsert} operations per Flume {@link Event} by
+ * parsing the event {@code body} using a regular expression. Values are
+ * coerced to the types of the named columns in the Kudu table.
+ *
+ * <p>Example: If the Kudu table has the schema:
+ *
+ * <pre>
+ * key INT32
+ * name STRING</pre>
+ *
+ * <p>and {@code producer.pattern = (?<key>\\d+),(?<name>\\w+)} then
+ * {@code RegexpKuduOperationsProducer} will parse the string:
+ *
+ * <pre>|12345,Mike||54321,Todd|</pre>
+ *
+ * into the rows: {@code (key=12345, name=Mike)} and {@code (key=54321, name=Todd)}.
+ *
+ * <p>Note: This class relies on JDK7 named capturing groups, which are
+ * documented in {@link Pattern}. The name of each capturing group must
+ * correspond to a column name in the destination Kudu table.
+ *
+ * <p><strong><code>RegexpKuduOperationsProducer</code> Flume Configuration Parameters</strong></p>
+ *
+ * <table cellpadding=3 cellspacing=0 border=1 summary="Flume Configuration Parameters">
+ * <tr>
+ *   <th>Property Name</th>
+ *   <th>Default</th>
+ *   <th>Required?</th>
+ *   <th>Description</th>
+ * </tr>
+ * <tr>
+ *   <td>producer.pattern</td>
+ *   <td></td>
+ *   <td>Yes</td>
+ *   <td>The regular expression used to parse the event body.</td>
+ * </tr>
+ * <tr>
+ *   <td>producer.charset</td>
+ *   <td>utf-8</td>
+ *   <td>No</td>
+ *   <td>The character set of the event body.</td>
+ * </tr>
+ * <tr>
+ *   <td>producer.operation</td>
+ *   <td>upsert</td>
+ *   <td>No</td>
+ *   <td>Operation type used to write the event to Kudu. Must be either
+ *   {@code insert} or {@code upsert}.</td>
+ * </tr>
+ * <tr>
+ *   <td>producer.skipMissingColumn</td>
+ *   <td>false</td>
+ *   <td>No</td>
+ *   <td>
+ *   <b>@deprecated</b><br/> use {@code producer.missingColumnPolicy}
+ *   What to do if a column in the Kudu table has no corresponding capture group.
+ *   If set to {@code true}, a warning message is logged and the operation is still attempted.
+ *   If set to {@code false}, an exception is thrown and the sink will not process the
+ *   {@code Event}, causing a Flume {@code Channel} rollback.
+ * </tr>
+ * <tr>
+ *   <td>producer.skipBadColumnValue</td>
+ *   <td>false</td>
+ *   <td>No</td>
+ *   <td>
+ *   <b>@deprecated</b><br/> use {@code producer.badColumnValuePolicy}
+ *   What to do if a value in the pattern match cannot be coerced to the required type.
+ *   If set to {@code true}, a warning message is logged and the operation is still attempted.
+ *   If set to {@code false}, an exception is thrown and the sink will not process the
+ *   {@code Event}, causing a Flume {@code Channel} rollback.
+ * </tr>
+ * <tr>
+ *   <td>producer.warnUnmatchedRows</td>
+ *   <td>true</td>
+ *   <td>No</td>
+ *   <td>
+ *   <b>@deprecated</b><br/> use {@code producer.unmatchedRowPolicy}
+ *   Whether to log a warning about payloads that do not match the pattern. If set to
+ *   {@code false}, event bodies with no matches will be silently dropped.</td>
+ * </tr>
+ * <tr>
+ *   <td>producer.missingColumnPolicy</td>
+ *   <td>REJECT</td>
+ *   <td>No</td>
+ *   <td>What to do if a column in the Kudu table has no corresponding capture group.<br/>
+ *   If set to {@code REJECT}, an exception is thrown and the sink will not process the
+ *   {@code Event}, causing a Flume {@code Channel} rollback.<br/>
+ *   If set to {@code WARN}, a warning message is logged and the operation is still produced.<br/>
+ *   If set to {@code IGNORE}, the operation is still produced without any log message.
+ * </tr>
+ * <tr>
+ *   <td>producer.badColumnValuePolicy</td>
+ *   <td>REJECT</td>
+ *   <td>No</td>
+ *   <td>What to do if a value in the pattern match cannot be coerced to the required type.<br/>
+ *   If set to {@code REJECT}, an exception is thrown and the sink will not process the
+ *   {@code Event}, causing a Flume {@code Channel} rollback.<br/>
+ *   If set to {@code WARN}, a warning message is logged and the operation is still produced,
+ *   but does not include the given column.<br/>
+ *   If set to {@code IGNORE}, the operation is still produced, but does not include the given
+ *   column and does not log any message.
+ * </tr>
+ * <tr>
+ *   <td>producer.unmatchedRowPolicy</td>
+ *   <td>WARN</td>
+ *   <td>No</td>
+ *   <td>What to do if a payload does not match the pattern.<br/>
+ *   If set to {@code REJECT}, an exception is thrown and the sink will not process the
+ *   {@code Event}, causing a Flume {@code Channel} rollback.<br/>
+ *   If set to {@code WARN}, a warning message is logged and the row is skipped,
+ *   not producing an operation.<br/>
+ *   If set to {@code IGNORE}, the row is skipped without any log message.
+ * </tr>
+ * </table>
+ *
+ * @see Pattern
+ */
+public class RegexpKuduOperationsProducer implements KuduOperationsProducer {
+  private static final Logger logger = LoggerFactory.getLogger(RegexpKuduOperationsProducer.class);
+  private static final String INSERT = "insert";
+  private static final String UPSERT = "upsert";
+  private static final List<String> validOperations = Lists.newArrayList(UPSERT, INSERT);
+
+  public static final String PATTERN_PROP = "pattern";
+  public static final String ENCODING_PROP = "encoding";
+  public static final String DEFAULT_ENCODING = "utf-8";
+  public static final String OPERATION_PROP = "operation";
+  public static final String DEFAULT_OPERATION = UPSERT;
+  @Deprecated
+  public static final String SKIP_MISSING_COLUMN_PROP = "skipMissingColumn";
+  @Deprecated
+  public static final boolean DEFAULT_SKIP_MISSING_COLUMN = false;
+  @Deprecated
+  public static final String SKIP_BAD_COLUMN_VALUE_PROP = "skipBadColumnValue";
+  @Deprecated
+  public static final boolean DEFAULT_SKIP_BAD_COLUMN_VALUE = false;
+  @Deprecated
+  public static final String WARN_UNMATCHED_ROWS_PROP = "skipUnmatchedRows";
+  @Deprecated
+  public static final boolean DEFAULT_WARN_UNMATCHED_ROWS = true;
+  public static final String MISSING_COLUMN_POLICY_PROP = "missingColumnPolicy";
+  public static final ParseErrorPolicy DEFAULT_MISSING_COLUMN_POLICY = ParseErrorPolicy.REJECT;
+  public static final String BAD_COLUMN_VALUE_POLICY_PROP = "badColumnValuePolicy";
+  public static final ParseErrorPolicy DEFAULT_BAD_COLUMN_VALUE_POLICY = ParseErrorPolicy.REJECT;
+  public static final String UNMATCHED_ROW_POLICY_PROP = "unmatchedRowPolicy";
+  public static final ParseErrorPolicy DEFAULT_UNMATCHED_ROW_POLICY = ParseErrorPolicy.WARN;
+
+  private KuduTable table;
+  private Pattern pattern;
+  private Charset charset;
+  private String operation;
+  private ParseErrorPolicy missingColumnPolicy;
+  private ParseErrorPolicy badColumnValuePolicy;
+  private ParseErrorPolicy unmatchedRowPolicy;
+
+  public RegexpKuduOperationsProducer() {
+  }
+
+  @Override
+  public void configure(Context context) {
+    String regexp = context.getString(PATTERN_PROP);
+    Preconditions.checkArgument(regexp != null,
+        "Required parameter %s is not specified",
+        PATTERN_PROP);
+    try {
+      pattern = Pattern.compile(regexp);
+    } catch (PatternSyntaxException e) {
+      throw new IllegalArgumentException(
+          String.format("The pattern '%s' is invalid", regexp), e);
+    }
+    String charsetName = context.getString(ENCODING_PROP, DEFAULT_ENCODING);
+    try {
+      charset = Charset.forName(charsetName);
+    } catch (IllegalArgumentException e) {
+      throw new FlumeException(
+          String.format("Invalid or unsupported charset %s", charsetName), e);
+    }
+    operation = context.getString(OPERATION_PROP, DEFAULT_OPERATION).toLowerCase(Locale.ENGLISH);
+    Preconditions.checkArgument(
+        validOperations.contains(operation),
+        "Unrecognized operation '%s'",
+        operation);
+
+
+    missingColumnPolicy = getParseErrorPolicyCheckingDeprecatedProperty(
+      context, SKIP_MISSING_COLUMN_PROP, MISSING_COLUMN_POLICY_PROP,
+      ParseErrorPolicy.WARN, ParseErrorPolicy.REJECT, DEFAULT_MISSING_COLUMN_POLICY
+    );
+
+    badColumnValuePolicy = getParseErrorPolicyCheckingDeprecatedProperty(
+      context, SKIP_BAD_COLUMN_VALUE_PROP, BAD_COLUMN_VALUE_POLICY_PROP,
+      ParseErrorPolicy.WARN, ParseErrorPolicy.REJECT, DEFAULT_BAD_COLUMN_VALUE_POLICY
+    );
+
+    unmatchedRowPolicy = getParseErrorPolicyCheckingDeprecatedProperty(
+      context, WARN_UNMATCHED_ROWS_PROP, UNMATCHED_ROW_POLICY_PROP,
+      ParseErrorPolicy.WARN, ParseErrorPolicy.IGNORE, DEFAULT_UNMATCHED_ROW_POLICY
+    );
+  }
+
+  @Override
+  public void initialize(KuduTable table) {
+    this.table = table;
+  }
+
+  @Override
+  public List<Operation> getOperations(Event event) throws FlumeException {
+    String raw = new String(event.getBody(), charset);
+    Matcher m = pattern.matcher(raw);
+    boolean match = false;
+    Schema schema = table.getSchema();
+    List<Operation> ops = Lists.newArrayList();
+    while (m.find()) {
+      match = true;
+      Operation op;
+      switch (operation) {
+        case UPSERT:
+          op = table.newUpsert();
+          break;
+        case INSERT:
+          op = table.newInsert();
+          break;
+        default:
+          throw new FlumeException(
+              String.format("Unrecognized operation type '%s' in getOperations(): " +
+                  "this should never happen!", operation));
+      }
+      PartialRow row = op.getRow();
+      for (ColumnSchema col : schema.getColumns()) {
+        try {
+          coerceAndSet(m.group(col.getName()), col.getName(), col.getType(), row);
+        } catch (NumberFormatException e) {
+          String msg = String.format(
+              "Raw value '%s' couldn't be parsed to type %s for column '%s'",
+              raw, col.getType(), col.getName());
+          logOrThrow(badColumnValuePolicy, msg, e);
+        } catch (IllegalArgumentException e) {
+          String msg = String.format(
+              "Column '%s' has no matching group in '%s'",
+              col.getName(), raw);
+          logOrThrow(missingColumnPolicy, msg, e);
+        } catch (Exception e) {
+          throw new FlumeException("Failed to create Kudu operation", e);
+        }
+      }
+      ops.add(op);
+    }
+    if (!match) {
+      String msg = String.format("Failed to match the pattern '%s' in '%s'", pattern, raw);
+      logOrThrow(unmatchedRowPolicy, msg, null);
+    }
+    return ops;
+  }
+
+  /**
+   * Coerces the string `rawVal` to the type `type` and sets the resulting
+   * value for column `colName` in `row`.
+   *
+   * @param rawVal the raw string column value
+   * @param colName the name of the column
+   * @param type the Kudu type to convert `rawVal` to
+   * @param row the row to set the value in
+   * @throws NumberFormatException if `rawVal` cannot be cast as `type`.
+   */
+  private void coerceAndSet(String rawVal, String colName, Type type, PartialRow row)
+      throws NumberFormatException {
+    switch (type) {
+      case BOOL:
+        row.addBoolean(colName, Boolean.parseBoolean(rawVal));
+        break;
+      case INT8:
+        row.addByte(colName, Byte.parseByte(rawVal));
+        break;
+      case INT16:
+        row.addShort(colName, Short.parseShort(rawVal));
+        break;
+      case INT32:
+        row.addInt(colName, Integer.parseInt(rawVal));
+        break;
+      case INT64: // Fall through
+      case UNIXTIME_MICROS:
+        row.addLong(colName, Long.parseLong(rawVal));
+        break;
+      case FLOAT:
+        row.addFloat(colName, Float.parseFloat(rawVal));
+        break;
+      case DOUBLE:
+        row.addDouble(colName, Double.parseDouble(rawVal));
+        break;
+      case BINARY:
+        row.addBinary(colName, rawVal.getBytes(charset));
+        break;
+      case STRING:
+        row.addString(colName, rawVal);
+        break;
+      default:
+        logger.warn("got unknown type {} for column '{}'-- ignoring this column",
+            type, colName);
+    }
+  }
+
+  private void logOrThrow(ParseErrorPolicy policy, String msg, Exception e)
+      throws FlumeException {
+    switch (policy) {
+      case REJECT:
+        throw new FlumeException(msg, e);
+      case WARN:
+        logger.warn(msg, e);
+        break;
+      case IGNORE:
+        // Fall through
+      default:
+    }
+  }
+
+  @Override
+  public void close() {
+  }
+
+  private ParseErrorPolicy getParseErrorPolicyCheckingDeprecatedProperty(
+      Context context, String deprecatedPropertyName, String newPropertyName,
+      ParseErrorPolicy trueValue, ParseErrorPolicy falseValue, ParseErrorPolicy defaultValue) {
+    ParseErrorPolicy policy;
+    if (context.containsKey(deprecatedPropertyName)) {
+      logger.info("Configuration property {} is deprecated. Use {} instead.",
+          deprecatedPropertyName, newPropertyName);
+      Preconditions.checkArgument(!context.containsKey(newPropertyName),
+          "Both {} and {} specified. Use only one of them, preferably {}.",
+          deprecatedPropertyName, newPropertyName, newPropertyName);
+      policy = context.getBoolean(deprecatedPropertyName) ? trueValue : falseValue;
+    } else {
+      String policyString = context.getString(newPropertyName, defaultValue.name());
+      try {
+        policy = ParseErrorPolicy.valueOf(policyString.toUpperCase(Locale.ENGLISH));
+      } catch (IllegalArgumentException e) {
+        throw new IllegalArgumentException(
+          "Unknown policy '" + policyString + "'. Use one of the following: " +
+              Arrays.toString(ParseErrorPolicy.values()), e);
+      }
+    }
+
+    return policy;
+  }
+
+  private enum ParseErrorPolicy {
+    WARN,
+    IGNORE,
+    REJECT
+  }
+}
diff --git a/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/SimpleKeyedKuduOperationsProducer.java b/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/SimpleKeyedKuduOperationsProducer.java
new file mode 100644
index 0000000..5287927
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/SimpleKeyedKuduOperationsProducer.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.sink.kudu;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.Upsert;
+
+/**
+ * A simple serializer that generates one {@link Insert} or {@link Upsert}
+ * per {@link Event} by writing the event body into a BINARY column. The pair
+ * (key column name, key column value) should be a header in the {@link Event};
+ * the column name is configurable but the column type must be STRING. Multiple
+ * key columns are not supported.
+ *
+ * <p><strong>Simple Keyed Kudu Operations Producer configuration parameters</strong>
+ *
+ * <table cellpadding=3 cellspacing=0 border=1
+ *        summary="Simple Keyed Kudu Operations Producer configuration parameters">
+ * <tr>
+ *   <th>Property Name</th>
+ *   <th>Default</th>
+ *   <th>Required?</th>
+ *   <th>Description</th>
+ * </tr>
+ * <tr>
+ *   <td>producer.payloadColumn</td>
+ *   <td>payload</td>
+ *   <td>No</td>
+ *   <td>The name of the BINARY column to write the Flume event body to.</td>
+ * </tr>
+ * <tr>
+ *   <td>producer.keyColumn</td>
+ *   <td>key</td>
+ *   <td>No</td>
+ *   <td>The name of the STRING key column of the target Kudu table.</td>
+ * </tr>
+ * <tr>
+ *   <td>producer.operation</td>
+ *   <td>upsert</td>
+ *   <td>No</td>
+ *   <td>The operation used to write events to Kudu. Supported operations
+ *   are 'insert' and 'upsert'</td>
+ * </tr>
+ * </table>
+ */
+public class SimpleKeyedKuduOperationsProducer implements KuduOperationsProducer {
+  public static final String PAYLOAD_COLUMN_PROP = "payloadColumn";
+  public static final String PAYLOAD_COLUMN_DEFAULT = "payload";
+  public static final String KEY_COLUMN_PROP = "keyColumn";
+  public static final String KEY_COLUMN_DEFAULT = "key";
+  public static final String OPERATION_PROP = "operation";
+  public static final String OPERATION_DEFAULT = "upsert";
+
+  private KuduTable table;
+  private String payloadColumn;
+  private String keyColumn;
+  private String operation = "";
+
+  public SimpleKeyedKuduOperationsProducer(){
+  }
+
+  @Override
+  public void configure(Context context) {
+    payloadColumn = context.getString(PAYLOAD_COLUMN_PROP, PAYLOAD_COLUMN_DEFAULT);
+    keyColumn = context.getString(KEY_COLUMN_PROP, KEY_COLUMN_DEFAULT);
+    operation = context.getString(OPERATION_PROP, OPERATION_DEFAULT);
+  }
+
+  @Override
+  public void initialize(KuduTable table) {
+    this.table = table;
+  }
+
+  @Override
+  public List<Operation> getOperations(Event event) throws FlumeException {
+    String key = event.getHeaders().get(keyColumn);
+    if (key == null) {
+      throw new FlumeException(
+          String.format("No value provided for key column %s", keyColumn));
+    }
+    try {
+      Operation op;
+      switch (operation.toLowerCase(Locale.ENGLISH)) {
+        case "upsert":
+          op = table.newUpsert();
+          break;
+        case "insert":
+          op = table.newInsert();
+          break;
+        default:
+          throw new FlumeException(
+              String.format("Unexpected operation %s", operation));
+      }
+      PartialRow row = op.getRow();
+      row.addString(keyColumn, key);
+      row.addBinary(payloadColumn, event.getBody());
+
+      return Collections.singletonList(op);
+    } catch (Exception e) {
+      throw new FlumeException("Failed to create Kudu Operation object", e);
+    }
+  }
+
+  @Override
+  public void close() {
+  }
+}
+
diff --git a/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/SimpleKuduOperationsProducer.java b/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/SimpleKuduOperationsProducer.java
new file mode 100644
index 0000000..70c91b8
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/SimpleKuduOperationsProducer.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.sink.kudu;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.PartialRow;
+
+/**
+ * A simple serializer that generates one {@link Insert} per {@link Event}
+ * by writing the event body into a BINARY column. The headers are discarded.
+ *
+ * <p><strong>Simple Kudu Event Producer configuration parameters</strong>
+ *
+ * <table cellpadding=3 cellspacing=0 border=1
+ *        summary="Simple Kudu Event Producer configuration parameters">
+ * <tr>
+ *   <th>Property Name</th>
+ *   <th>Default</th>
+ *   <th>Required?</th>
+ *   <th>Description</th>
+ * </tr>
+ * <tr>
+ *   <td>producer.payloadColumn</td>
+ *   <td>payload</td>
+ *   <td>No</td>
+ *   <td>The name of the BINARY column to write the Flume the event body to.</td>
+ * </tr>
+ * </table>
+ */
+public class SimpleKuduOperationsProducer implements KuduOperationsProducer {
+  public static final String PAYLOAD_COLUMN_PROP = "payloadColumn";
+  public static final String PAYLOAD_COLUMN_DEFAULT = "payload";
+
+  private KuduTable table;
+  private String payloadColumn;
+
+  public SimpleKuduOperationsProducer() {
+  }
+
+  @Override
+  public void configure(Context context) {
+    payloadColumn = context.getString(PAYLOAD_COLUMN_PROP, PAYLOAD_COLUMN_DEFAULT);
+  }
+
+  @Override
+  public void initialize(KuduTable table) {
+    this.table = table;
+  }
+
+  @Override
+  public List<Operation> getOperations(Event event) throws FlumeException {
+    try {
+      Insert insert = table.newInsert();
+      PartialRow row = insert.getRow();
+      row.addBinary(payloadColumn, event.getBody());
+
+      return Collections.singletonList((Operation) insert);
+    } catch (Exception e) {
+      throw new FlumeException("Failed to create Kudu Insert object", e);
+    }
+  }
+
+  @Override
+  public void close() {
+  }
+}
diff --git a/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/KuduSinkTestUtil.java b/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/KuduSinkTestUtil.java
new file mode 100644
index 0000000..253657e
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/KuduSinkTestUtil.java
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.flume.sink.kudu;
+
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+
+import java.util.List;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Sink.Status;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.client.KuduClient;
+
+class KuduSinkTestUtil {
+  private static final Logger LOG = LoggerFactory.getLogger(KuduSinkTestUtil.class);
+
+  static KuduSink createSink(KuduClient client, String tableName, Context ctx) {
+    return createSink(tableName, client, ctx, client.getMasterAddressesAsString());
+  }
+
+  private static KuduSink createSink(
+      String tableName, KuduClient client, Context ctx, String masterAddresses) {
+    LOG.info("Creating Kudu sink for '{}' table...", tableName);
+
+    Context context = new Context();
+    context.put(KuduSinkConfigurationConstants.TABLE_NAME, tableName);
+    context.put(KuduSinkConfigurationConstants.MASTER_ADDRESSES, masterAddresses);
+    context.putAll(ctx.getParameters());
+    KuduSink sink = new KuduSink(client);
+    Configurables.configure(sink, context);
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, new Context());
+    sink.setChannel(channel);
+
+    LOG.info("Created Kudu sink for '{}' table.", tableName);
+
+    return sink;
+  }
+
+  static KuduSink createSecureSink(String tableName, String masterAddresses, String clusterRoot) {
+    Context context = new Context();
+    context.put(KuduSinkConfigurationConstants.KERBEROS_KEYTAB, clusterRoot +
+        "/krb5kdc/test-user.keytab");
+    context.put(KuduSinkConfigurationConstants.KERBEROS_PRINCIPAL, "test-user@KRBTEST.COM");
+
+    return createSink(tableName, null, context, masterAddresses);
+  }
+
+  static void processEventsCreatingSink(
+      KuduClient syncClient, Context context, String tableName, List<Event> events)
+      throws EventDeliveryException {
+    KuduSink sink = createSink(syncClient, tableName, context);
+    sink.start();
+    processEvents(sink, events);
+  }
+
+  static void processEvents(KuduSink sink, List<Event> events) throws EventDeliveryException {
+    Channel channel = sink.getChannel();
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    for (Event e : events) {
+      channel.put(e);
+    }
+    tx.commit();
+    tx.close();
+
+    Status status = sink.process();
+    if (events.isEmpty()) {
+      assertSame("incorrect status for empty channel", status, Status.BACKOFF);
+    } else {
+      assertNotSame("incorrect status for non-empty channel", status, Status.BACKOFF);
+    }
+  }
+}
diff --git a/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/TestAvroKuduOperationsProducer.java b/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/TestAvroKuduOperationsProducer.java
new file mode 100644
index 0000000..04cc676
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/TestAvroKuduOperationsProducer.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.sink.kudu;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.kudu.test.ClientTestUtil.scanTableToStrings;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.test.KuduTestHarness;
+
+public class TestAvroKuduOperationsProducer {
+  private static String schemaUriString;
+  private static String schemaLiteral;
+
+  static {
+    try {
+      String schemaPath = "/testAvroKuduOperationsProducer.avsc";
+      URL schemaUrl = TestAvroKuduOperationsProducer.class.getResource(schemaPath);
+      File schemaFile = Paths.get(schemaUrl.toURI()).toFile();
+      schemaUriString = schemaFile.getAbsoluteFile().toURI().toString();
+      schemaLiteral = Files.toString(schemaFile, UTF_8);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  enum SchemaLocation {
+    GLOBAL, URL, LITERAL
+  }
+
+  @Rule
+  public KuduTestHarness harness = new KuduTestHarness();
+
+  @Test
+  public void testEmptyChannel() throws Exception {
+    testEvents(0, SchemaLocation.GLOBAL);
+  }
+
+  @Test
+  public void testOneEvent() throws Exception {
+    testEvents(1, SchemaLocation.GLOBAL);
+  }
+
+  @Test
+  public void testThreeEvents() throws Exception {
+    testEvents(3, SchemaLocation.GLOBAL);
+  }
+
+  @Test
+  public void testThreeEventsSchemaURLInEvent() throws Exception {
+    testEvents(3, SchemaLocation.URL);
+  }
+
+  @Test
+  public void testThreeEventsSchemaLiteralInEvent() throws Exception {
+    testEvents(3, SchemaLocation.LITERAL);
+  }
+
+  private void testEvents(int eventCount, SchemaLocation schemaLocation)
+      throws Exception {
+    KuduTable table = createNewTable(
+        String.format("test%sevents%s", eventCount, schemaLocation));
+    String tableName = table.getName();
+    Context context = schemaLocation != SchemaLocation.GLOBAL ? new Context()
+        : new Context(ImmutableMap.of(KuduSinkConfigurationConstants.PRODUCER_PREFIX +
+        AvroKuduOperationsProducer.SCHEMA_PROP, schemaUriString));
+    context.put(KuduSinkConfigurationConstants.PRODUCER,
+        AvroKuduOperationsProducer.class.getName());
+
+    List<Event> events = generateEvents(eventCount, schemaLocation);
+
+    KuduSinkTestUtil.processEventsCreatingSink(harness.getClient(), context, tableName, events);
+
+    List<String> answers = makeAnswers(eventCount);
+    List<String> rows = scanTableToStrings(table);
+    assertEquals("wrong number of rows inserted", answers.size(), rows.size());
+    assertArrayEquals("wrong rows inserted", answers.toArray(), rows.toArray());
+  }
+
+  private KuduTable createNewTable(String tableName) throws Exception {
+    List<ColumnSchema> columns = new ArrayList<>(5);
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("longField", Type.INT64).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("doubleField", Type.DOUBLE).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("nullableField", Type.STRING)
+        .nullable(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("stringField", Type.STRING).build());
+    CreateTableOptions createOptions =
+        new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("key"))
+            .setNumReplicas(1);
+    return harness.getClient().createTable(tableName, new Schema(columns), createOptions);
+  }
+
+  private List<Event> generateEvents(int eventCount,
+                                     SchemaLocation schemaLocation) throws Exception {
+    List<Event> events = new ArrayList<>();
+    for (int i = 0; i < eventCount; i++) {
+      AvroKuduOperationsProducerTestRecord record = new AvroKuduOperationsProducerTestRecord();
+      record.setKey(10 * i);
+      record.setLongField(2L * i);
+      record.setDoubleField(2.71828 * i);
+      record.setNullableField(i % 2 == 0 ? null : "taco");
+      record.setStringField(String.format("hello %d", i));
+      ByteArrayOutputStream out = new ByteArrayOutputStream();
+      Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
+      DatumWriter<AvroKuduOperationsProducerTestRecord> writer =
+          new SpecificDatumWriter<>(AvroKuduOperationsProducerTestRecord.class);
+      writer.write(record, encoder);
+      encoder.flush();
+      Event e = EventBuilder.withBody(out.toByteArray());
+      if (schemaLocation == SchemaLocation.URL) {
+        e.setHeaders(ImmutableMap.of(AvroKuduOperationsProducer.SCHEMA_URL_HEADER,
+            schemaUriString));
+      } else if (schemaLocation == SchemaLocation.LITERAL) {
+        e.setHeaders(ImmutableMap.of(AvroKuduOperationsProducer.SCHEMA_LITERAL_HEADER,
+            schemaLiteral));
+      }
+      events.add(e);
+    }
+    return events;
+  }
+
+  private List<String> makeAnswers(int eventCount) {
+    List<String> answers = Lists.newArrayList();
+    for (int i = 0; i < eventCount; i++) {
+      answers.add(String.format(
+          "INT32 key=%s, INT64 longField=%s, DOUBLE doubleField=%s, " +
+              "STRING nullableField=%s, STRING stringField=hello %s",
+          10 * i,
+          2 * i,
+          2.71828 * i,
+          i % 2 == 0 ? "NULL" : "taco",
+          i));
+    }
+    Collections.sort(answers);
+    return answers;
+  }
+}
diff --git a/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/TestKeyedKuduOperationsProducer.java b/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/TestKeyedKuduOperationsProducer.java
new file mode 100644
index 0000000..a7cccf5
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/TestKeyedKuduOperationsProducer.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.kudu;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.kudu.test.ClientTestUtil.scanTableToStrings;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.test.KuduTestHarness;
+
+public class TestKeyedKuduOperationsProducer {
+  private static final Logger LOG = LoggerFactory.getLogger(TestKeyedKuduOperationsProducer.class);
+
+  @Rule
+  public KuduTestHarness harness = new KuduTestHarness();
+
+  private KuduTable createNewTable(String tableName) throws Exception {
+    LOG.info("Creating new table...");
+
+    ArrayList<ColumnSchema> columns = new ArrayList<>(2);
+    columns.add(
+        new ColumnSchema.ColumnSchemaBuilder(
+            SimpleKeyedKuduOperationsProducer.KEY_COLUMN_DEFAULT, Type.STRING)
+            .key(true).build());
+    columns.add(
+        new ColumnSchema.ColumnSchemaBuilder(
+            SimpleKeyedKuduOperationsProducer.PAYLOAD_COLUMN_DEFAULT, Type.BINARY)
+            .key(false).build());
+    CreateTableOptions createOptions =
+        new CreateTableOptions()
+            .setRangePartitionColumns(ImmutableList.of(
+                SimpleKeyedKuduOperationsProducer.KEY_COLUMN_DEFAULT))
+            .setNumReplicas(1);
+    KuduTable table =
+        harness.getClient().createTable(tableName, new Schema(columns), createOptions);
+
+    LOG.info("Created new table.");
+
+    return table;
+  }
+
+  @Test
+  public void testEmptyChannelWithInsert() throws Exception {
+    testEvents(0, "insert");
+  }
+
+  @Test
+  public void testOneEventWithInsert() throws Exception {
+    testEvents(1, "insert");
+  }
+
+  @Test
+  public void testThreeEventsWithInsert() throws Exception {
+    testEvents(3, "insert");
+  }
+
+  @Test
+  public void testEmptyChannelWithUpsert() throws Exception {
+    testEvents(0, "upsert");
+  }
+
+  @Test
+  public void testOneEventWithUpsert() throws Exception {
+    testEvents(1, "upsert");
+  }
+
+  @Test
+  public void testThreeEventsWithUpsert() throws Exception {
+    testEvents(3, "upsert");
+  }
+
+  @Test
+  public void testDuplicateRowsWithUpsert() throws Exception {
+    LOG.info("Testing events with upsert...");
+
+    KuduTable table = createNewTable("testDupUpsertEvents");
+    String tableName = table.getName();
+    Context ctx = new Context(ImmutableMap.of(
+        KuduSinkConfigurationConstants.PRODUCER_PREFIX +
+            SimpleKeyedKuduOperationsProducer.OPERATION_PROP, "upsert",
+        KuduSinkConfigurationConstants.PRODUCER, SimpleKeyedKuduOperationsProducer.class.getName()
+    ));
+    KuduSink sink = KuduSinkTestUtil.createSink(harness.getClient(), tableName, ctx);
+    sink.start();
+
+    int numRows = 3;
+    List<Event> events = new ArrayList<>();
+    for (int i = 0; i < numRows; i++) {
+      Event e = EventBuilder.withBody(String.format("payload body %s", i), UTF_8);
+      e.setHeaders(ImmutableMap.of(SimpleKeyedKuduOperationsProducer.KEY_COLUMN_DEFAULT,
+          String.format("key %s", i)));
+      events.add(e);
+    }
+
+    KuduSinkTestUtil.processEvents(sink, events);
+
+    List<String> rows = scanTableToStrings(table);
+    assertEquals(numRows + " row(s) expected", numRows, rows.size());
+
+    for (int i = 0; i < numRows; i++) {
+      assertTrue("incorrect payload", rows.get(i).contains("payload body " + i));
+    }
+
+    Event dup = EventBuilder.withBody("payload body upserted".getBytes(UTF_8));
+    dup.setHeaders(ImmutableMap.of("key", String.format("key %s", 0)));
+
+    KuduSinkTestUtil.processEvents(sink, ImmutableList.of(dup));
+
+    List<String> upRows = scanTableToStrings(table);
+    assertEquals(numRows + " row(s) expected", numRows, upRows.size());
+
+    assertTrue("incorrect payload", upRows.get(0).contains("payload body upserted"));
+    for (int i = 1; i < numRows; i++) {
+      assertTrue("incorrect payload", upRows.get(i).contains("payload body " + i));
+    }
+
+    LOG.info("Testing events with upsert finished successfully.");
+  }
+
+  private void testEvents(int eventCount, String operation) throws Exception {
+    LOG.info("Testing {} events...", eventCount);
+
+    KuduTable table = createNewTable("test" + eventCount + "events" + operation);
+    String tableName = table.getName();
+    Context context = new Context(ImmutableMap.of(
+        KuduSinkConfigurationConstants.PRODUCER_PREFIX +
+            SimpleKeyedKuduOperationsProducer.OPERATION_PROP, operation,
+        KuduSinkConfigurationConstants.PRODUCER, SimpleKeyedKuduOperationsProducer.class.getName()
+    ));
+
+    List<Event> events = getEvents(eventCount);
+
+    KuduSinkTestUtil.processEventsCreatingSink(harness.getClient(), context, tableName, events);
+
+    List<String> rows = scanTableToStrings(table);
+    assertEquals(eventCount + " row(s) expected", eventCount, rows.size());
+
+    for (int i = 0; i < eventCount; i++) {
+      assertTrue("incorrect payload", rows.get(i).contains("payload body " + i));
+    }
+
+    LOG.info("Testing {} events finished successfully.", eventCount);
+  }
+
+  private List<Event> getEvents(int eventCount) {
+    List<Event> events = new ArrayList<>();
+    for (int i = 0; i < eventCount; i++) {
+      Event e = EventBuilder.withBody(String.format("payload body %s", i).getBytes(UTF_8));
+      e.setHeaders(ImmutableMap.of("key", String.format("key %s", i)));
+      events.add(e);
+    }
+    return events;
+  }
+}
diff --git a/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/TestKuduSink.java b/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/TestKuduSink.java
new file mode 100644
index 0000000..c242262
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/TestKuduSink.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.kudu;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.kudu.test.ClientTestUtil.scanTableToStrings;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.Sink;
+import org.apache.flume.Sink.Status;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.apache.kudu.test.KuduTestHarness;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduTable;
+
+public class TestKuduSink {
+  private static final Logger LOG = LoggerFactory.getLogger(TestKuduSink.class);
+
+  @Rule
+  public KuduTestHarness harness = new KuduTestHarness();
+
+  private KuduTable createNewTable(String tableName) throws Exception {
+    LOG.info("Creating new table...");
+
+    ArrayList<ColumnSchema> columns = new ArrayList<>(1);
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("payload", Type.BINARY).key(true).build());
+    CreateTableOptions createOptions =
+        new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("payload"))
+                                .setNumReplicas(1);
+    KuduTable table = harness.getClient().createTable(tableName, new Schema(columns),
+        createOptions);
+
+    LOG.info("Created new table.");
+
+    return table;
+  }
+
+  @Test
+  public void testMandatoryParameters() {
+    LOG.info("Testing mandatory parameters...");
+
+    KuduSink sink = new KuduSink(harness.getClient());
+
+    HashMap<String, String> parameters = new HashMap<>();
+    Context context = new Context(parameters);
+    try {
+      Configurables.configure(sink, context);
+      Assert.fail("Should have failed due to missing properties");
+    } catch (NullPointerException npe) {
+        //good
+    }
+
+    parameters.put(KuduSinkConfigurationConstants.TABLE_NAME, "tableName");
+    context = new Context(parameters);
+    try {
+      Configurables.configure(sink, context);
+      Assert.fail("Should have failed due to missing properties");
+    } catch (NullPointerException npe) {
+        //good
+    }
+
+    LOG.info("Testing mandatory parameters finished successfully.");
+  }
+
+  @Test(expected = FlumeException.class)
+  public void testMissingTable() {
+    LOG.info("Testing missing table...");
+
+    KuduSink sink = KuduSinkTestUtil.createSink(harness.getClient(), "missingTable",
+        new Context());
+    sink.start();
+
+    LOG.info("Testing missing table finished successfully.");
+  }
+
+  @Test
+  public void testEmptyChannelWithDefaults() throws Exception {
+    testEventsWithDefaults(0);
+  }
+
+  @Test
+  public void testOneEventWithDefaults() throws Exception {
+    testEventsWithDefaults(1);
+  }
+
+  @Test
+  public void testThreeEventsWithDefaults() throws Exception {
+    testEventsWithDefaults(3);
+  }
+
+  @Test
+  public void testDuplicateRowsWithDuplicatesIgnored() throws Exception {
+    doTestDuplicateRows(true);
+  }
+
+  @Test
+  public void testDuplicateRowsWithDuplicatesNotIgnored() throws Exception {
+    doTestDuplicateRows(false);
+  }
+
+  private void doTestDuplicateRows(boolean ignoreDuplicateRows) throws Exception {
+    KuduTable table = createNewTable("testDuplicateRows" + ignoreDuplicateRows);
+    String tableName = table.getName();
+    Context sinkContext = new Context();
+    sinkContext.put(KuduSinkConfigurationConstants.IGNORE_DUPLICATE_ROWS,
+                    Boolean.toString(ignoreDuplicateRows));
+    KuduSink sink = KuduSinkTestUtil.createSink(harness.getClient(), tableName, sinkContext);
+    sink.start();
+    Channel channel = sink.getChannel();
+
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+
+    for (int i = 0; i < 2; i++) {
+      Event e = EventBuilder.withBody("key-0", UTF_8); // Duplicate keys.
+      channel.put(e);
+    }
+
+    tx.commit();
+    tx.close();
+
+    try {
+      Sink.Status status = sink.process();
+      if (!ignoreDuplicateRows) {
+        fail("Incorrectly ignored duplicate rows!");
+      }
+      assertSame("incorrect status for empty channel", status, Status.READY);
+    } catch (EventDeliveryException e) {
+      if (ignoreDuplicateRows) {
+        throw new AssertionError("Failed to ignore duplicate rows!", e);
+      } else {
+        LOG.info("Correctly did not ignore duplicate rows", e);
+        return;
+      }
+    }
+
+    // We only get here if the process() succeeded.
+    try {
+      List<String> rows = scanTableToStrings(table);
+      assertEquals("1 row expected", 1, rows.size());
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    LOG.info("Testing duplicate events finished successfully.");
+  }
+
+  private void testEventsWithDefaults(int eventCount) throws Exception {
+    LOG.info("Testing {} events...", eventCount);
+
+    KuduTable table = createNewTable("test" + eventCount + "events");
+    String tableName = table.getName();
+
+    List<Event> events = new ArrayList<>();
+
+    for (int i = 0; i < eventCount; i++) {
+      Event e = EventBuilder.withBody(String.format("payload body %s", i).getBytes(UTF_8));
+      events.add(e);
+    }
+
+    KuduSinkTestUtil.processEventsCreatingSink(harness.getClient(), new Context(), tableName,
+        events);
+
+    List<String> rows = scanTableToStrings(table);
+    assertEquals(eventCount + " row(s) expected", eventCount, rows.size());
+
+    for (int i = 0; i < eventCount; i++) {
+      assertTrue("incorrect payload", rows.get(i).contains("payload body " + i));
+    }
+
+    LOG.info("Testing {} events finished successfully.", eventCount);
+  }
+}
diff --git a/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/TestRegexpKuduOperationsProducer.java b/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/TestRegexpKuduOperationsProducer.java
new file mode 100644
index 0000000..f8e9c41
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/TestRegexpKuduOperationsProducer.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.kudu;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.kudu.test.ClientTestUtil.scanTableToStrings;
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+import org.apache.kudu.test.KuduTestHarness;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduTable;
+
+public class TestRegexpKuduOperationsProducer {
+  private static final String TEST_REGEXP =
+      "(?<key>\\d+),(?<byteFld>\\d+),(?<shortFld>\\d+),(?<intFld>\\d+)," +
+      "(?<longFld>\\d+),(?<binaryFld>\\w+),(?<stringFld>\\w+),(?<boolFld>\\w+)," +
+      "(?<floatFld>\\d+\\.\\d*),(?<doubleFld>\\d+.\\d*)";
+
+  @Rule
+  public KuduTestHarness harness = new KuduTestHarness();
+
+  private KuduTable createNewTable(String tableName) throws Exception {
+    ArrayList<ColumnSchema> columns = new ArrayList<>(10);
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("byteFld", Type.INT8).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("shortFld", Type.INT16).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("intFld", Type.INT32).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("longFld", Type.INT64).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("binaryFld", Type.BINARY).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("stringFld", Type.STRING).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("boolFld", Type.BOOL).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("floatFld", Type.FLOAT).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("doubleFld", Type.DOUBLE).build());
+    CreateTableOptions createOptions =
+        new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 3).setNumReplicas(1);
+    return harness.getClient().createTable(tableName, new Schema(columns), createOptions);
+  }
+
+  @Test
+  public void testEmptyChannel() throws Exception {
+    testEvents(0, 1, "insert");
+  }
+
+  @Test
+  public void testOneEvent() throws Exception {
+    testEvents(1, 1, "insert");
+  }
+
+  @Test
+  public void testThreeEvents() throws Exception {
+    testEvents(3, 1, "insert");
+  }
+
+  @Test
+  public void testThreeEventsWithUpsert() throws Exception {
+    testEvents(3, 1, "upsert");
+  }
+
+  @Test
+  public void testOneEventTwoRowsEach() throws Exception {
+    testEvents(1, 2, "insert");
+  }
+
+  @Test
+  public void testTwoEventsTwoRowsEach() throws Exception {
+    testEvents(2, 2, "insert");
+  }
+
+  @Test
+  public void testTwoEventsTwoRowsEachWithUpsert() throws Exception {
+    testEvents(2, 2, "upsert");
+  }
+
+  private void testEvents(int eventCount, int perEventRowCount, String operation) throws Exception {
+    String tableName = String.format("test%sevents%srowseach%s",
+        eventCount, perEventRowCount, operation);
+    Context context = new Context();
+    context.put(KuduSinkConfigurationConstants.PRODUCER_PREFIX +
+        RegexpKuduOperationsProducer.PATTERN_PROP, TEST_REGEXP);
+    context.put(KuduSinkConfigurationConstants.PRODUCER_PREFIX +
+        RegexpKuduOperationsProducer.OPERATION_PROP, operation);
+    context.put(KuduSinkConfigurationConstants.PRODUCER,
+        RegexpKuduOperationsProducer.class.getName());
+    KuduTable table = createNewTable(tableName);
+
+    List<Event> events = generateEvents(eventCount, perEventRowCount, operation);
+
+    KuduSinkTestUtil.processEventsCreatingSink(harness.getClient(), context, tableName, events);
+
+    List<String> rows = scanTableToStrings(table);
+    assertEquals(eventCount * perEventRowCount + " row(s) expected",
+        eventCount * perEventRowCount,
+        rows.size());
+
+    ArrayList<String> rightAnswers = new ArrayList<>(eventCount * perEventRowCount);
+    for (int i = 0; i < eventCount; i++) {
+      for (int j = 0; j < perEventRowCount; j++) {
+        int value = operation.equals("upsert") && i == 0 ? 1 : i;
+        String baseAnswer = "INT32 key=1%2$d%3$d1, INT8 byteFld=%1$d, INT16 shortFld=%1$d, " +
+            "INT32 intFld=%1$d, INT64 longFld=%1$d, BINARY binaryFld=\"binary\", " +
+            "STRING stringFld=string, BOOL boolFld=false, FLOAT floatFld=%1$d.%1$d, " +
+            "DOUBLE doubleFld=%1$d.%1$d";
+        String rightAnswer = String.format(baseAnswer, value, i, j);
+        rightAnswers.add(rightAnswer);
+      }
+    }
+    Collections.sort(rightAnswers);
+
+    for (int k = 0; k < eventCount * perEventRowCount; k++) {
+      assertEquals("incorrect row", rightAnswers.get(k), rows.get(k));
+    }
+  }
+
+  private List<Event> generateEvents(int eventCount, int perEventRowCount, String operation) {
+    List<Event> events = new ArrayList<>();
+
+    for (int i = 0; i < eventCount; i++) {
+      StringBuilder payload = new StringBuilder();
+      for (int j = 0; j < perEventRowCount; j++) {
+        String baseRow = "|1%1$d%2$d1,%1$d,%1$d,%1$d,%1$d,binary," +
+            "string,false,%1$d.%1$d,%1$d.%1$d,%1$d.%1$d|";
+        String row = String.format(baseRow, i, j);
+        payload.append(row);
+      }
+      Event e = EventBuilder.withBody(payload.toString().getBytes(UTF_8));
+      events.add(e);
+    }
+
+    if (eventCount > 0) {
+      // In the upsert case, add one upsert row per insert event (i.e. per i)
+      // All such rows go in one event.
+      if (operation.equals("upsert")) {
+        StringBuilder upserts = new StringBuilder();
+        for (int j = 0; j < perEventRowCount; j++) {
+          String row = String.format("|1%2$d%3$d1,%1$d,%1$d,%1$d,%1$d,binary," +
+              "string,false,%1$d.%1$d,%1$d.%1$d,%1$d.%1$d|", 1, 0, j);
+          upserts.append(row);
+        }
+        Event e = EventBuilder.withBody(upserts.toString().getBytes(UTF_8));
+        events.add(e);
+      }
+
+      // Also check some bad/corner cases.
+      String mismatchInInt = "|1,2,taco,4,5,x,y,true,1.0.2.0,999|";
+      String emptyString = "";
+      String[] testCases = {mismatchInInt, emptyString};
+      for (String testCase : testCases) {
+        Event e = EventBuilder.withBody(testCase.getBytes(UTF_8));
+        events.add(e);
+      }
+    }
+    return events;
+  }
+}
diff --git a/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/TestRegexpKuduOperationsProducerParseError.java b/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/TestRegexpKuduOperationsProducerParseError.java
new file mode 100644
index 0000000..e4af76f
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/TestRegexpKuduOperationsProducerParseError.java
@@ -0,0 +1,291 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.flume.sink.kudu;
+
+import static org.apache.flume.sink.kudu.RegexpKuduOperationsProducer.*;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.Closeable;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.flume.Context;
+import org.apache.flume.FlumeException;
+import org.apache.flume.event.EventBuilder;
+import org.apache.kudu.test.KuduTestHarness;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.RuleChain;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.test.CapturingLogAppender;
+
+public class TestRegexpKuduOperationsProducerParseError {
+  private static final String TEST_REGEXP = "(?<key>\\d+),(?<byteFld>\\d+),(?<stringFld>\\w+)";
+  private static final String TEST_REGEXP_MISSING_COLUMN = "(?<key>\\d+),(?<byteFld>\\d+)";
+  private static final String TEST_OPERATION = "insert";
+
+  private static final String ROW_UNMATCHING = "invalid row";
+  private static final String ROW_BAD_COLUMN_VALUE = "1,1000,string";
+  private static final String ROW_MISSING_COLUMN = "1,1";
+
+  private static final String ERROR_MSG_UNMATCHED_ROW =
+      "Failed to match the pattern '" + TEST_REGEXP + "' in '" + ROW_UNMATCHING + "'";
+  private static final String ERROR_MSG_MISSING_COLUMN =
+      "Column 'stringFld' has no matching group in '" + ROW_MISSING_COLUMN + "'";
+  private static final String ERROR_MSG_BAD_COLUMN_VALUE =
+      "Raw value '" + ROW_BAD_COLUMN_VALUE +
+          "' couldn't be parsed to type Type: int8 for column 'byteFld'";
+
+  private static final String POLICY_REJECT = "REJECT";
+  private static final String POLICY_WARN = "WARN";
+  private static final String POLICY_IGNORE = "IGNORE";
+
+  public KuduTestHarness harness = new KuduTestHarness();
+  public ExpectedException thrown = ExpectedException.none();
+
+  // ExpectedException misbehaves when combined with other rules; we use a
+  // RuleChain to beat it into submission.
+  //
+  // See https://stackoverflow.com/q/28846088 for more information.
+  @Rule
+  public RuleChain chain = RuleChain.outerRule(harness).around(thrown);
+
+  @Test
+  public void testMissingColumnThrowsExceptionDefaultConfig() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(PATTERN_PROP, TEST_REGEXP_MISSING_COLUMN);
+    testThrowsException(additionalContext, ERROR_MSG_MISSING_COLUMN, ROW_MISSING_COLUMN);
+  }
+
+  @Test
+  public void testMissingColumnThrowsExceptionDeprecated() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(PATTERN_PROP, TEST_REGEXP_MISSING_COLUMN);
+    additionalContext.put(SKIP_MISSING_COLUMN_PROP, String.valueOf(false));
+    testThrowsException(additionalContext, ERROR_MSG_MISSING_COLUMN, ROW_MISSING_COLUMN);
+  }
+
+  @Test
+  public void testMissingColumnThrowsException() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(PATTERN_PROP, TEST_REGEXP_MISSING_COLUMN);
+    additionalContext.put(MISSING_COLUMN_POLICY_PROP, POLICY_REJECT);
+    testThrowsException(additionalContext, ERROR_MSG_MISSING_COLUMN, ROW_MISSING_COLUMN);
+  }
+
+  @Test
+  public void testMissingColumnLogsWarningDeprecated() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(PATTERN_PROP, TEST_REGEXP_MISSING_COLUMN);
+    additionalContext.put(SKIP_MISSING_COLUMN_PROP, String.valueOf(true));
+    testLogging(additionalContext, ERROR_MSG_MISSING_COLUMN, ROW_MISSING_COLUMN);
+  }
+
+  @Test
+  public void testMissingColumnLogsWarning() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(PATTERN_PROP, TEST_REGEXP_MISSING_COLUMN);
+    additionalContext.put(MISSING_COLUMN_POLICY_PROP, POLICY_WARN);
+    testLogging(additionalContext, ERROR_MSG_MISSING_COLUMN, ROW_MISSING_COLUMN);
+  }
+
+
+  @Test
+  public void testMissingColumnIgnored() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(PATTERN_PROP, TEST_REGEXP_MISSING_COLUMN);
+    additionalContext.put(MISSING_COLUMN_POLICY_PROP, POLICY_IGNORE);
+    testIgnored(additionalContext, ERROR_MSG_MISSING_COLUMN, ROW_MISSING_COLUMN);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testMissingColumnConfigValidation() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(SKIP_MISSING_COLUMN_PROP, String.valueOf(false));
+    additionalContext.put(MISSING_COLUMN_POLICY_PROP, POLICY_IGNORE);
+    getProducer(additionalContext);
+  }
+
+  @Test
+  public void testBadColumnValueThrowsExceptionDefaultConfig() throws Exception {
+    Context additionalContext = new Context();
+    testThrowsException(additionalContext, ERROR_MSG_BAD_COLUMN_VALUE, ROW_BAD_COLUMN_VALUE);
+  }
+
+  @Test
+  public void testBadColumnValueThrowsExceptionDeprecated() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(SKIP_BAD_COLUMN_VALUE_PROP, String.valueOf(false));
+    testThrowsException(additionalContext, ERROR_MSG_BAD_COLUMN_VALUE, ROW_BAD_COLUMN_VALUE);
+  }
+
+  @Test
+  public void testBadColumnValueThrowsException() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(BAD_COLUMN_VALUE_POLICY_PROP, POLICY_REJECT);
+    testThrowsException(additionalContext, ERROR_MSG_BAD_COLUMN_VALUE, ROW_BAD_COLUMN_VALUE);
+  }
+
+  @Test
+  public void testBadColumnValueLogsWarningDeprecated() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(SKIP_BAD_COLUMN_VALUE_PROP, String.valueOf(true));
+    testLogging(additionalContext, ERROR_MSG_BAD_COLUMN_VALUE, ROW_BAD_COLUMN_VALUE);
+  }
+
+  @Test
+  public void testBadColumnValueLogsWarning() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(BAD_COLUMN_VALUE_POLICY_PROP, POLICY_WARN);
+    testLogging(additionalContext, ERROR_MSG_BAD_COLUMN_VALUE, ROW_BAD_COLUMN_VALUE);
+  }
+
+  @Test
+  public void testBadColumnValueIgnored() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(BAD_COLUMN_VALUE_POLICY_PROP, POLICY_IGNORE);
+    testIgnored(additionalContext, ERROR_MSG_BAD_COLUMN_VALUE, ROW_BAD_COLUMN_VALUE);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testBadColumnValueConfigValidation() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(SKIP_BAD_COLUMN_VALUE_PROP, String.valueOf(false));
+    additionalContext.put(BAD_COLUMN_VALUE_POLICY_PROP, POLICY_IGNORE);
+    getProducer(additionalContext);
+  }
+
+  @Test
+  public void testUnmatchedRowLogsWarningWithDefaultConfig() throws Exception {
+    Context additionalContext = new Context();
+    testLogging(additionalContext, ERROR_MSG_UNMATCHED_ROW, ROW_UNMATCHING);
+  }
+
+  @Test
+  public void testUnmatchedRowThrowsException() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(UNMATCHED_ROW_POLICY_PROP, POLICY_REJECT);
+    testThrowsException(additionalContext, ERROR_MSG_UNMATCHED_ROW, ROW_UNMATCHING);
+  }
+
+  @Test
+  public void testUnmatchedRowLogsWarningDeprecated() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(WARN_UNMATCHED_ROWS_PROP, String.valueOf(true));
+    testLogging(additionalContext, ERROR_MSG_UNMATCHED_ROW, ROW_UNMATCHING);
+  }
+
+  @Test
+  public void testUnmatchedRowLogsWarning() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(UNMATCHED_ROW_POLICY_PROP, POLICY_WARN);
+    testLogging(additionalContext, ERROR_MSG_UNMATCHED_ROW, ROW_UNMATCHING);
+  }
+
+  @Test
+  public void testUnmatchedRowIgnoredDeprecated() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(WARN_UNMATCHED_ROWS_PROP, String.valueOf(false));
+    testIgnored(additionalContext, ERROR_MSG_UNMATCHED_ROW, ROW_UNMATCHING);
+  }
+
+  @Test
+  public void testUnmatchedRowIgnored() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(UNMATCHED_ROW_POLICY_PROP, POLICY_IGNORE);
+    testIgnored(additionalContext, ERROR_MSG_UNMATCHED_ROW, ROW_UNMATCHING);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testUnmatchedRowConfigValidation() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(WARN_UNMATCHED_ROWS_PROP, String.valueOf(false));
+    additionalContext.put(UNMATCHED_ROW_POLICY_PROP, POLICY_IGNORE);
+    getProducer(additionalContext);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testUnKnownPolicyConfigValidation() throws Exception {
+    Context additionalContext = new Context();
+    additionalContext.put(UNMATCHED_ROW_POLICY_PROP, "FORCED");
+    getProducer(additionalContext);
+  }
+
+  private void testLogging(
+      Context additionalContext, String expectedError, String eventBody) throws Exception {
+    String appendedText = processEvent(additionalContext, eventBody);
+    assertTrue(appendedText.contains(expectedError));
+  }
+
+  private void testIgnored(
+      Context additionalContext, String expectedError, String eventBody) throws Exception {
+    String appendedText = processEvent(additionalContext, eventBody);
+    assertFalse(appendedText.contains(expectedError));
+  }
+
+  private void testThrowsException(
+      Context additionalContext, String expectedError, String eventBody) throws Exception {
+    thrown.expect(FlumeException.class);
+    thrown.expectMessage(expectedError);
+    processEvent(additionalContext, eventBody);
+  }
+
+  private String processEvent(Context additionalContext, String eventBody) throws Exception {
+    CapturingLogAppender appender = new CapturingLogAppender();
+    RegexpKuduOperationsProducer producer = getProducer(additionalContext);
+    try (Closeable c = appender.attach()) {
+      producer.getOperations(EventBuilder.withBody(eventBody.getBytes(Charset.forName("UTF-8"))));
+    }
+    return appender.getAppendedText();
+  }
+
+
+  private RegexpKuduOperationsProducer getProducer(Context additionalContext) throws Exception {
+    RegexpKuduOperationsProducer producer = new RegexpKuduOperationsProducer();
+    producer.initialize(createNewTable("test"));
+    Context context = new Context();
+    context.put(PATTERN_PROP, TEST_REGEXP);
+    context.put(OPERATION_PROP, TEST_OPERATION);
+    context.putAll(additionalContext.getParameters());
+    producer.configure(context);
+
+    return producer;
+  }
+
+  private KuduTable createNewTable(String tableName) throws Exception {
+    ArrayList<ColumnSchema> columns = new ArrayList<>(10);
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("byteFld", Type.INT8).build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("stringFld", Type.STRING).build());
+    CreateTableOptions createOptions = new CreateTableOptions()
+        .addHashPartitions(ImmutableList.of("key"), 3).setNumReplicas(1);
+    KuduTable table =
+        harness.getClient().createTable(tableName, new Schema(columns), createOptions);
+    return table;
+  }
+
+
+}
diff --git a/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/TestSecureKuduSink.java b/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/TestSecureKuduSink.java
new file mode 100644
index 0000000..0b5b4ac
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/TestSecureKuduSink.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.sink.kudu;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.kudu.test.ClientTestUtil.scanTableToStrings;
+import static org.apache.kudu.util.SecurityUtil.KUDU_TICKETCACHE_PROPERTY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.event.EventBuilder;
+import org.apache.kudu.test.KuduTestHarness;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.test.cluster.MiniKuduCluster.MiniKuduClusterBuilder;
+
+public class TestSecureKuduSink {
+  private static final Logger LOG = LoggerFactory.getLogger(TestSecureKuduSink.class);
+  private static final int TICKET_LIFETIME_SECONDS = 20;
+  private static final int RENEWABLE_LIFETIME_SECONDS = 35;
+
+  private static final MiniKuduClusterBuilder clusterBuilder =
+      KuduTestHarness.getBaseClusterBuilder()
+      .kdcTicketLifetime(TICKET_LIFETIME_SECONDS + "s")
+      .kdcRenewLifetime(RENEWABLE_LIFETIME_SECONDS + "s")
+      .enableKerberos();
+
+  @Rule
+  public KuduTestHarness harness = new KuduTestHarness(clusterBuilder);
+
+  @Before
+  public void clearTicketCacheProperty() {
+    // Let Flume authenticate.
+    System.clearProperty(KUDU_TICKETCACHE_PROPERTY);
+  }
+
+  @Test
+  public void testEventsWithShortTickets() throws Exception {
+    Instant start = Instant.now();
+    LOG.info("Creating new table...");
+    ArrayList<ColumnSchema> columns = new ArrayList<>(1);
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("payload", Type.BINARY)
+        .key(true).build());
+    CreateTableOptions createOptions =
+        new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("payload"))
+        .setNumReplicas(1);
+    String tableName = "test_long_lived_events";
+    KuduTable table = harness.getClient().createTable(tableName, new Schema(columns),
+        createOptions);
+    LOG.info("Created new table.");
+
+    KuduSink sink = KuduSinkTestUtil.createSecureSink(
+        tableName, harness.getMasterAddressesAsString(), harness.getClusterRoot());
+    sink.start();
+
+    LOG.info("Testing events at the beginning.");
+    int eventCount = 10;
+
+    processEvents(sink, 0, eventCount / 2);
+
+    LOG.info("Waiting for tickets to expire");
+    Duration elapsedSoFar = Duration.between(Instant.now(), start);
+    TimeUnit.MILLISECONDS.sleep(1000 * (RENEWABLE_LIFETIME_SECONDS + 1) -
+        elapsedSoFar.toMillis());
+    // At this point, the ticket will have been outstanding for at least
+    // (RENEWABLE_LIFETIME_SECONDS + 1) seconds-- so the sink will need to reacquire a ticket.
+
+    LOG.info("Testing events after ticket renewal.");
+    processEvents(sink, eventCount / 2, eventCount);
+
+    List<String> rows = scanTableToStrings(table);
+    assertEquals(eventCount + " row(s) expected", eventCount, rows.size());
+
+    for (int i = 0; i < eventCount; i++) {
+      assertTrue("incorrect payload", rows.get(i).contains("payload body " + i));
+    }
+
+    LOG.info("Testing {} events finished successfully.", eventCount);
+  }
+
+  private void processEvents(KuduSink sink, int from, int to) throws EventDeliveryException {
+    List<Event> events = new ArrayList<>();
+    for (int i = from; i < to; i++) {
+      Event e = EventBuilder.withBody(String.format("payload body %s", i).getBytes(UTF_8));
+      events.add(e);
+    }
+
+    KuduSinkTestUtil.processEvents(sink, events);
+    LOG.info("Events flushed.");
+  }
+}
diff --git a/flume-ng-sinks/flume-ng-kudu-sink/src/test/resources/log4j2.xml b/flume-ng-sinks/flume-ng-kudu-sink/src/test/resources/log4j2.xml
new file mode 100644
index 0000000..45542ba
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kudu-sink/src/test/resources/log4j2.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<Configuration status="OFF">
+    <Appenders>
+        <Console name="Console" target="SYSTEM_OUT">
+            <PatternLayout pattern="%d (%t) [%p - %l] %m%n" />
+        </Console>
+    </Appenders>
+    <Loggers>
+        <Logger name="org.apache.flume" level="DEBUG"/>
+        <Logger name="org.apache.kudu" level="DEBUG"/>
+        <Root level="INFO">
+            <AppenderRef ref="Console" />
+        </Root>
+    </Loggers>
+</Configuration>
\ No newline at end of file
diff --git a/flume-ng-sinks/flume-ng-kudu-sink/src/test/resources/testAvroKuduOperationsProducer.avsc b/flume-ng-sinks/flume-ng-kudu-sink/src/test/resources/testAvroKuduOperationsProducer.avsc
new file mode 100644
index 0000000..d1f660e
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kudu-sink/src/test/resources/testAvroKuduOperationsProducer.avsc
@@ -0,0 +1,11 @@
+{"namespace": "org.apache.flume.sink.kudu",
+  "type": "record",
+  "name": "AvroKuduOperationsProducerTestRecord",
+  "fields": [
+    {"name": "key", "type": "int"},
+    {"name": "longField",  "type": "long"},
+    {"name": "doubleField",  "type": "double"},
+    {"name": "nullableField",  "type": ["string", "null"]},
+    {"name": "stringField", "type": "string"}
+  ]
+}
\ No newline at end of file
diff --git a/flume-ng-sinks/pom.xml b/flume-ng-sinks/pom.xml
index 99d2126..c2fd7e1 100644
--- a/flume-ng-sinks/pom.xml
+++ b/flume-ng-sinks/pom.xml
@@ -39,6 +39,7 @@ limitations under the License.
     <module>flume-ng-elasticsearch-sink</module>
     <module>flume-ng-morphline-solr-sink</module>
     <module>flume-ng-kafka-sink</module>
+    <module>flume-ng-kudu-sink</module>
     <module>flume-http-sink</module>
     <module>flume-dataset-sink</module>
     <module>flume-hive-sink</module>
diff --git a/pom.xml b/pom.xml
index be7edbc..ce71aa9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -72,6 +72,7 @@ limitations under the License.
     <hbase.version>1.0.0</hbase.version>
     <hbase2.jetty.version>9.3.19.v20170502</hbase2.jetty.version>
     <hbase2.version>2.0.0</hbase2.version>
+    <hive.version>1.0.0</hive.version>
     <httpcore.version>4.4.6</httpcore.version>
     <httpclient.version>4.5.3</httpclient.version>
     <irclib.version>1.10</irclib.version>
@@ -81,9 +82,9 @@ limitations under the License.
     <junit.version>4.10</junit.version>
     <kafka.version>2.0.1</kafka.version>
     <kite.version>1.0.0</kite.version>
-    <hive.version>1.0.0</hive.version>
+    <kudu.version>1.10.0</kudu.version>
     <lifecycle-mapping.version>1.0.0</lifecycle-mapping.version>
-    <log4j.version>2.10.0</log4j.version>
+    <log4j.version>2.11.2</log4j.version>
     <mapdb.version>0.9.9</mapdb.version>
     <mina.version>2.0.4</mina.version>
     <mockito.version>1.9.0</mockito.version>