You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2019/11/13 18:32:20 UTC

[kudu] branch master updated: Remove Flume integration module

This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new c8d36a9  Remove Flume integration module
c8d36a9 is described below

commit c8d36a9f9424c8bb8ef20e86a576caea818689b0
Author: Grant Henke <gr...@apache.org>
AuthorDate: Wed Sep 18 13:30:02 2019 -0500

    Remove Flume integration module
    
    This patch removes the Flume integration now that it has been moved to
    the Apache Flume project.
    
    Jira: https://issues.apache.org/jira/browse/FLUME-3345
    Commit: https://github.com/apache/flume/commit/9dafe98972a652aa1a202c6df9c08139d2bea592
    
    Change-Id: I56ebc975d99ea49d36c5afc02fa4914b360ee082
    Reviewed-on: http://gerrit.cloudera.org:8080/14255
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
 java/buildSrc/build.gradle                         |   1 -
 java/gradle/dependencies.gradle                    |   3 -
 java/kudu-flume-sink/build.gradle                  |  44 ---
 .../flume/sink/AvroKuduOperationsProducer.java     | 309 ----------------
 .../kudu/flume/sink/KuduOperationsProducer.java    |  58 ---
 .../java/org/apache/kudu/flume/sink/KuduSink.java  | 325 -----------------
 .../flume/sink/KuduSinkConfigurationConstants.java |  82 -----
 .../flume/sink/RegexpKuduOperationsProducer.java   | 405 ---------------------
 .../sink/SimpleKeyedKuduOperationsProducer.java    | 138 -------
 .../flume/sink/SimpleKuduOperationsProducer.java   |  95 -----
 .../flume/sink/AvroKuduOperationsProducerTest.java | 191 ----------
 .../sink/KeyedKuduOperationsProducerTest.java      | 187 ----------
 .../org/apache/kudu/flume/sink/KuduSinkTest.java   | 210 -----------
 .../apache/kudu/flume/sink/KuduSinkTestUtil.java   | 102 ------
 ...RegexpKuduOperationsProducerParseErrorTest.java | 297 ---------------
 .../sink/RegexpKuduOperationsProducerTest.java     | 188 ----------
 .../apache/kudu/flume/sink/SecureKuduSinkTest.java | 122 -------
 .../src/test/resources/log4j2.properties           |  32 --
 .../resources/testAvroKuduOperationsProducer.avsc  |  14 -
 java/settings.gradle                               |   1 -
 20 files changed, 2804 deletions(-)

diff --git a/java/buildSrc/build.gradle b/java/buildSrc/build.gradle
index 2cb860c..7560ecd 100644
--- a/java/buildSrc/build.gradle
+++ b/java/buildSrc/build.gradle
@@ -28,7 +28,6 @@ repositories {
 // Manage plugin dependencies since the plugin block can't be used in included build scripts yet.
 // For more details see: https://docs.gradle.org/current/userguide/plugins.html#plugins_dsl_limitations
 dependencies {
-  compile "com.commercehub.gradle.plugin:gradle-avro-plugin:0.16.0"
   compile "com.github.ben-manes:gradle-versions-plugin:0.21.0"
   compile "com.github.jengelman.gradle.plugins:shadow:5.0.0"
   compile "gradle.plugin.com.google.gradle:osdetector-gradle-plugin:1.6.2"
diff --git a/java/gradle/dependencies.gradle b/java/gradle/dependencies.gradle
index 221e055..7471cec 100755
--- a/java/gradle/dependencies.gradle
+++ b/java/gradle/dependencies.gradle
@@ -32,7 +32,6 @@ versions += [
     commonsIo      : "2.6",
     errorProne     : "2.3.3",
     errorProneJavac: "9+181-r4173-1",
-    flume          : "1.9.0",
     gradle         : "5.4.1",
     guava          : "27.1-jre",
     hadoop         : "3.2.0",
@@ -80,8 +79,6 @@ libs += [
     commonsIo            : "commons-io:commons-io:$versions.commonsIo",
     errorProne           : "com.google.errorprone:error_prone_core:$versions.errorProne",
     errorProneJavac      : "com.google.errorprone:javac:$versions.errorProneJavac",
-    flumeConfiguration   : "org.apache.flume:flume-ng-configuration:$versions.flume",
-    flumeCore            : "org.apache.flume:flume-ng-core:$versions.flume",
     guava                : "com.google.guava:guava:$versions.guava",
     hadoopClient         : "org.apache.hadoop:hadoop-client:$versions.hadoop",
     hadoopCommon         : "org.apache.hadoop:hadoop-common:$versions.hadoop",
diff --git a/java/kudu-flume-sink/build.gradle b/java/kudu-flume-sink/build.gradle
deleted file mode 100644
index cd61cd1..0000000
--- a/java/kudu-flume-sink/build.gradle
+++ /dev/null
@@ -1,44 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// Add the Avro plugin to support code generation from schema files.
-apply from: "$rootDir/gradle/shadow.gradle"
-apply plugin: "com.commercehub.gradle.plugin.avro"
-
-dependencies {
-  compile project(path: ":kudu-client", configuration: "shadow")
-
-  optional libs.yetusAnnotations
-
-  provided libs.avro
-  provided libs.flumeConfiguration
-  provided libs.flumeCore
-  provided libs.guava
-  provided libs.hadoopCommon
-  provided libs.slf4jApi
-
-  testCompile project(path: ":kudu-test-utils", configuration: "shadow")
-  testCompile libs.junit
-  testCompile libs.log4j
-  testCompile libs.log4jSlf4jImpl
-}
-
-// Configure the Avro plugin to compile the schemas in the
-// resource directory so the schemas are available in tests.
-generateTestAvroJava {
-  source("src/test/resources")
-}
\ No newline at end of file
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducer.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducer.java
deleted file mode 100644
index f9a7ddc..0000000
--- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducer.java
+++ /dev/null
@@ -1,309 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.kudu.flume.sink;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.net.URI;
-import java.net.URL;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-
-import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.util.concurrent.UncheckedExecutionException;
-import org.apache.avro.LogicalType;
-import org.apache.avro.LogicalTypes;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.FlumeException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.kudu.Type;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-
-import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.Operation;
-import org.apache.kudu.client.PartialRow;
-
-/**
- * An Avro serializer that generates one operation per event by deserializing the event
- * body as an Avro record and mapping its fields to columns in a Kudu table.
- *
- * <p><strong>Avro Kudu Operations Producer configuration parameters</strong>
- * <table cellpadding=3 cellspacing=0 border=1 summary="Avro Kudu Operations Producer configuration parameters">
- * <tr><th>Property Name</th>
- *   <th>Default</th>
- *   <th>Required?</th>
- *   <th>Description</th></tr>
- * <tr>
- *   <td>producer.operation</td>
- *   <td>upsert</td>
- *   <td>No</td>
- *   <td>The operation used to write events to Kudu.
- *   Supported operations are 'insert' and 'upsert'</td>
- * </tr>
- * <tr>
- *   <td>producer.schemaPath</td>
- *   <td></td>
- *   <td>No</td>
- *   <td>The location of the Avro schema file used to deserialize the Avro-encoded event bodies.
- *   It's used whenever an event does not include its own schema. If not specified, the
- *   schema must be specified on a per-event basis, either by url or as a literal.
- *   Schemas must be record type.</td>
- * </tr>
- * </table>
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class AvroKuduOperationsProducer implements KuduOperationsProducer {
-  public static final String OPERATION_PROP = "operation";
-  public static final String SCHEMA_PROP = "schemaPath";
-  public static final String DEFAULT_OPERATION = "upsert";
-  public static final String SCHEMA_URL_HEADER = "flume.avro.schema.url";
-  public static final String SCHEMA_LITERAL_HEADER = "flume.avro.schema.literal";
-
-  private String operation;
-  private GenericRecord reuse;
-  private KuduTable table;
-  private String defaultSchemaUrl;
-
-  /**
-   * The binary decoder to reuse for event parsing.
-   */
-  private BinaryDecoder decoder = null;
-
-  /**
-   * A cache of schemas retrieved by URL to avoid re-parsing the schema.
-   */
-  private static final LoadingCache<String, Schema> schemasFromURL =
-      CacheBuilder.newBuilder()
-          .build(new CacheLoader<String, Schema>() {
-            @Override
-            public Schema load(String url) throws IOException {
-              Schema.Parser parser = new Schema.Parser();
-              InputStream is = null;
-              try {
-                FileSystem fs = FileSystem.get(URI.create(url), conf);
-                if (url.toLowerCase(Locale.ENGLISH).startsWith("hdfs:/")) {
-                  is = fs.open(new Path(url));
-                } else {
-                  is = new URL(url).openStream();
-                }
-                return parser.parse(is);
-              } finally {
-                if (is != null) {
-                  is.close();
-                }
-              }
-            }
-          });
-
-  /**
-   * A cache of literal schemas to avoid re-parsing the schema.
-   */
-  private static final LoadingCache<String, Schema> schemasFromLiteral =
-      CacheBuilder.newBuilder()
-          .build(new CacheLoader<String, Schema>() {
-            @Override
-            public Schema load(String literal) {
-              Preconditions.checkNotNull(literal,
-                  "Schema literal cannot be null without a Schema URL");
-              return new Schema.Parser().parse(literal);
-            }
-          });
-
-  /**
-   * A cache of DatumReaders per schema.
-   */
-  private static final LoadingCache<Schema, DatumReader<GenericRecord>> readers =
-      CacheBuilder.newBuilder()
-          .build(new CacheLoader<Schema, DatumReader<GenericRecord>>() {
-            @Override
-            public DatumReader<GenericRecord> load(Schema schema) {
-              return new GenericDatumReader<>(schema);
-            }
-          });
-
-  private static final Configuration conf = new Configuration();
-
-  public AvroKuduOperationsProducer() {
-  }
-
-  @Override
-  public void configure(Context context) {
-    this.operation = context.getString(OPERATION_PROP, DEFAULT_OPERATION);
-
-    String schemaPath = context.getString(SCHEMA_PROP);
-    if (schemaPath != null) {
-      defaultSchemaUrl = schemaPath;
-    }
-  }
-
-  @Override
-  public void initialize(KuduTable table) {
-    this.table = table;
-  }
-
-  @Override
-  public List<Operation> getOperations(Event event) throws FlumeException {
-    Schema schema = getSchema(event);
-    DatumReader<GenericRecord> reader = readers.getUnchecked(schema);
-    decoder = DecoderFactory.get().binaryDecoder(event.getBody(), decoder);
-    try {
-      reuse = reader.read(reuse, decoder);
-    } catch (IOException e) {
-      throw new FlumeException("Cannot deserialize event", e);
-    }
-    Operation op;
-    switch (operation.toLowerCase()) {
-      case "upsert":
-        op = table.newUpsert();
-        break;
-      case "insert":
-        op = table.newInsert();
-        break;
-      default:
-        throw new FlumeException(String.format("Unexpected operation %s", operation));
-    }
-    setupOp(op, schema, reuse);
-    return Collections.singletonList(op);
-  }
-
-  private void setupOp(Operation op, Schema schema, GenericRecord record) {
-    PartialRow row = op.getRow();
-    for (ColumnSchema col : table.getSchema().getColumns()) {
-      String name = col.getName();
-      Object value = record.get(name);
-      if (value == null) {
-        if (col.isNullable()) {
-          row.setNull(name);
-        } else {
-          // leave unset for possible Kudu default
-        }
-      } else {
-        // Avro doesn't support 8- or 16-bit integer types, but we'll allow them to be passed as
-        // a larger type.
-        try {
-          switch (col.getType()) {
-            case BOOL:
-              row.addBoolean(name, (boolean) value);
-              break;
-            case INT8:
-              row.addByte(name, (byte) value);
-              break;
-            case INT16:
-              row.addShort(name, (short) value);
-              break;
-            case INT32:
-              row.addInt(name, (int) value);
-              break;
-            case INT64: // Fall through
-            case UNIXTIME_MICROS:
-              row.addLong(name, (long) value);
-              break;
-            case FLOAT:
-              row.addFloat(name, (float) value);
-              break;
-            case DOUBLE:
-              row.addDouble(name, (double) value);
-              break;
-            case DECIMAL:
-              row.addDecimal(name, getAvroBigDecimal(schema, name, value));
-              break;
-            case STRING:
-              row.addString(name, value.toString());
-              break;
-            case BINARY:
-              row.addBinary(name, (byte[]) value);
-              break;
-            default:
-              throw new FlumeException(String.format(
-                  "Unrecognized type %s for column %s", col.getType().toString(), name));
-          }
-        } catch (ClassCastException e) {
-          throw new FlumeException(
-              String.format("Failed to coerce value for column '%s' to type %s",
-              col.getName(),
-              col.getType()), e);
-        }
-      }
-    }
-  }
-
-  private BigDecimal getAvroBigDecimal(Schema schema, String name, Object value) {
-    LogicalType logicalType = schema.getField(name).schema().getLogicalType();
-    if (!(logicalType instanceof LogicalTypes.Decimal)) {
-      throw new FlumeException(String.format(
-          "Failed to coerce value for column '%s' to type %s",
-          name,
-          Type.DECIMAL));
-    }
-    int scale = ((LogicalTypes.Decimal) logicalType).getScale();
-    BigInteger unscaledValue = new BigInteger(((ByteBuffer) value).array());
-    return new BigDecimal(unscaledValue, scale);
-  }
-
-  private Schema getSchema(Event event) throws FlumeException {
-    Map<String, String> headers = event.getHeaders();
-    String schemaUrl = headers.get(SCHEMA_URL_HEADER);
-    String schemaLiteral = headers.get(SCHEMA_LITERAL_HEADER);
-    try {
-      if (schemaUrl != null) {
-        return schemasFromURL.get(schemaUrl);
-      } else if (schemaLiteral != null) {
-        return schemasFromLiteral.get(schemaLiteral);
-      } else if (defaultSchemaUrl != null) {
-        return schemasFromURL.get(defaultSchemaUrl);
-      } else {
-        throw new FlumeException(
-            String.format("No schema for event. " +
-                "Specify configuration property '%s' or event header '%s'",
-                SCHEMA_PROP,
-                SCHEMA_URL_HEADER));
-      }
-    } catch (ExecutionException e) {
-      throw new FlumeException("Cannot get schema", e);
-    } catch (UncheckedExecutionException e) {
-      throw new FlumeException("Cannot parse schema", e);
-    }
-  }
-
-  @Override
-  public void close() {
-  }
-}
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduOperationsProducer.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduOperationsProducer.java
deleted file mode 100644
index d886b84..0000000
--- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduOperationsProducer.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.kudu.flume.sink;
-
-import java.util.List;
-
-import org.apache.flume.Event;
-import org.apache.flume.conf.Configurable;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.Operation;
-
-/**
- * Interface for an operations producer that produces Kudu Operations from
- * Flume events.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public interface KuduOperationsProducer extends Configurable, AutoCloseable {
-  /**
-   * Initializes the operations producer. Called between configure and
-   * getOperations.
-   * @param table the KuduTable used to create Kudu Operation objects
-   */
-  void initialize(KuduTable table);
-
-  /**
-   * Returns the operations that should be written to Kudu as a result of this event.
-   * @param event Event to convert to one or more Operations
-   * @return List of Operations that should be written to Kudu
-   */
-  List<Operation> getOperations(Event event);
-
-  /**
-   * Cleans up any state. Called when the sink is stopped.
-   */
-  @Override
-  void close();
-}
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java
deleted file mode 100644
index 969c712..0000000
--- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.kudu.flume.sink;
-
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.BATCH_SIZE;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.IGNORE_DUPLICATE_ROWS;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.KERBEROS_KEYTAB;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.KERBEROS_PRINCIPAL;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.MASTER_ADDRESSES;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PROXY_USER;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.TABLE_NAME;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.TIMEOUT_MILLIS;
-
-import java.security.PrivilegedAction;
-import java.util.List;
-
-import com.google.common.base.Preconditions;
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.FlumeException;
-import org.apache.flume.Transaction;
-import org.apache.flume.auth.FlumeAuthenticationUtil;
-import org.apache.flume.auth.PrivilegedExecutor;
-import org.apache.flume.conf.Configurable;
-import org.apache.flume.instrumentation.SinkCounter;
-import org.apache.flume.sink.AbstractSink;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.kudu.client.AsyncKuduClient;
-import org.apache.kudu.client.KuduClient;
-import org.apache.kudu.client.KuduSession;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.Operation;
-import org.apache.kudu.client.OperationResponse;
-import org.apache.kudu.client.SessionConfiguration;
-
-/**
- * A Flume sink that reads events from a channel and writes them to a Kudu table.
- *
- * <p><strong>Flume Kudu Sink configuration parameters</strong>
- *
- * <table cellpadding=3 cellspacing=0 border=1 summary="Flume Kudu Sink configuration parameters">
- * <tr><th>Property Name</th><th>Default</th><th>Required?</th><th>Description</th></tr>
- * <tr><td>channel</td><td></td><td>Yes</td><td>The name of the Flume channel to read.</td></tr>
- * <tr><td>type</td><td></td><td>Yes</td>
- *     <td>Component name. Must be {@code org.apache.kudu.flume.sink.KuduSink}</td></tr>
- * <tr><td>masterAddresses</td><td></td><td>Yes</td>
- *     <td>Comma-separated list of "host:port" Kudu master addresses.
- *     The port is optional.</td></tr>
- * <tr><td>tableName</td><td></td><td>Yes</td>
- *     <td>The name of the Kudu table to write to.</td></tr>
- * <tr><td>batchSize</td><td>1000</td><td>No</td>
- * <td>The maximum number of events the sink takes from the channel per transaction.</td></tr>
- * <tr><td>ignoreDuplicateRows</td><td>true</td>
- *     <td>No</td><td>Whether to ignore duplicate primary key errors caused by inserts.</td></tr>
- * <tr><td>timeoutMillis</td><td>10000</td><td>No</td>
- *     <td>Timeout period for Kudu write operations, in milliseconds.</td></tr>
- * <tr><td>producer</td><td>{@link SimpleKuduOperationsProducer}</td><td>No</td>
- *     <td>The fully-qualified class name of the {@link KuduOperationsProducer}
- *     the sink should use.</td></tr>
- * <tr><td>producer.*</td><td></td><td>(Varies by operations producer)</td>
- *     <td>Configuration properties to pass to the operations producer implementation.</td></tr>
- * </table>
- *
- * <p><strong>Installation</strong>
- *
- * <p>After building the sink, in order to use it with Flume, place the file named
- * <tt>kudu-flume-sink-<em>VERSION</em>-jar-with-dependencies.jar</tt> in the
- * Flume <tt>plugins.d</tt> directory under <tt>kudu-flume-sink/lib/</tt>.
- *
- * <p>For detailed instructions on using Flume's plugins.d mechanism, please see the plugins.d
- * section of the <a href="https://flume.apache.org/FlumeUserGuide.html#the-plugins-d-directory">Flume User Guide</a>.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class KuduSink extends AbstractSink implements Configurable {
-  private static final Logger logger = LoggerFactory.getLogger(KuduSink.class);
-  private static final int DEFAULT_BATCH_SIZE = 1000;
-  private static final Long DEFAULT_TIMEOUT_MILLIS =
-          AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS;
-  private static final String DEFAULT_KUDU_OPERATION_PRODUCER =
-          "org.apache.kudu.flume.sink.SimpleKuduOperationsProducer";
-  private static final boolean DEFAULT_IGNORE_DUPLICATE_ROWS = true;
-
-  private String masterAddresses;
-  private String tableName;
-  private int batchSize;
-  private long timeoutMillis;
-  private boolean ignoreDuplicateRows;
-  private KuduTable table;
-  private KuduSession session;
-  private KuduClient client;
-  private KuduOperationsProducer operationsProducer;
-  private SinkCounter sinkCounter;
-  private PrivilegedExecutor privilegedExecutor;
-
-  public KuduSink() {
-    this(null);
-  }
-
-  @InterfaceAudience.LimitedPrivate("Test")
-  @InterfaceAudience.Private
-  public KuduSink(KuduClient kuduClient) {
-    this.client = kuduClient;
-  }
-
-  @Override
-  public synchronized void start() {
-    Preconditions.checkState(table == null && session == null,
-        "Please call stop before calling start on an old instance.");
-
-    // Client is not null only inside tests.
-    if (client == null) {
-      // Creating client with FlumeAuthenticator.
-      client = privilegedExecutor.execute(
-        new PrivilegedAction<KuduClient>() {
-          @Override
-          public KuduClient run() {
-            return new KuduClient.KuduClientBuilder(masterAddresses).build();
-          }
-        }
-      );
-    }
-    session = client.newSession();
-    session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
-    session.setTimeoutMillis(timeoutMillis);
-    session.setIgnoreAllDuplicateRows(ignoreDuplicateRows);
-    session.setMutationBufferSpace(batchSize);
-
-    try {
-      table = client.openTable(tableName);
-    } catch (Exception ex) {
-      sinkCounter.incrementConnectionFailedCount();
-      String msg = String.format("Could not open Kudu table '%s'", tableName);
-      logger.error(msg, ex);
-      throw new FlumeException(msg, ex);
-    }
-    operationsProducer.initialize(table);
-
-    super.start();
-    sinkCounter.incrementConnectionCreatedCount();
-    sinkCounter.start();
-  }
-
-  @Override
-  public synchronized void stop() {
-    Exception ex = null;
-    try {
-      operationsProducer.close();
-    } catch (Exception e) {
-      ex = e;
-      logger.error("Error closing operations producer", e);
-    }
-    try {
-      if (client != null) {
-        client.shutdown();
-      }
-    } catch (Exception e) {
-      ex = e;
-      logger.error("Error closing client", e);
-    }
-    client = null;
-    table = null;
-    session = null;
-
-    sinkCounter.incrementConnectionClosedCount();
-    sinkCounter.stop();
-    if (ex != null) {
-      throw new FlumeException("Error stopping sink", ex);
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public void configure(Context context) {
-    masterAddresses = context.getString(MASTER_ADDRESSES);
-    Preconditions.checkNotNull(masterAddresses,
-        "Missing master addresses. Please specify property '%s'.",
-        MASTER_ADDRESSES);
-
-    tableName = context.getString(TABLE_NAME);
-    Preconditions.checkNotNull(tableName,
-        "Missing table name. Please specify property '%s'",
-        TABLE_NAME);
-
-    batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE);
-    timeoutMillis = context.getLong(TIMEOUT_MILLIS, DEFAULT_TIMEOUT_MILLIS);
-    ignoreDuplicateRows = context.getBoolean(IGNORE_DUPLICATE_ROWS, DEFAULT_IGNORE_DUPLICATE_ROWS);
-    String operationProducerType = context.getString(PRODUCER);
-    String kerberosPrincipal = context.getString(KERBEROS_PRINCIPAL);
-    String kerberosKeytab = context.getString(KERBEROS_KEYTAB);
-    String proxyUser = context.getString(PROXY_USER);
-
-    privilegedExecutor = FlumeAuthenticationUtil.getAuthenticator(
-        kerberosPrincipal, kerberosKeytab).proxyAs(proxyUser);
-
-    // Check for operations producer, if null set default operations producer type.
-    if (operationProducerType == null || operationProducerType.isEmpty()) {
-      operationProducerType = DEFAULT_KUDU_OPERATION_PRODUCER;
-      logger.warn("No Kudu operations producer provided, using default");
-    }
-
-    Context producerContext = new Context();
-    producerContext.putAll(context.getSubProperties(
-        KuduSinkConfigurationConstants.PRODUCER_PREFIX));
-
-    try {
-      Class<? extends KuduOperationsProducer> clazz =
-          (Class<? extends KuduOperationsProducer>)
-          Class.forName(operationProducerType);
-      operationsProducer = clazz.getDeclaredConstructor().newInstance();
-      operationsProducer.configure(producerContext);
-    } catch (Exception e) {
-      logger.error("Could not instantiate Kudu operations producer" , e);
-      throw new RuntimeException(e);
-    }
-    sinkCounter = new SinkCounter(this.getName());
-  }
-
-  public KuduClient getClient() {
-    return client;
-  }
-
-  @Override
-  public Status process() throws EventDeliveryException {
-    if (session.hasPendingOperations()) {
-      // If for whatever reason we have pending operations, refuse to process
-      // more and tell the caller to try again a bit later. We don't want to
-      // pile on the KuduSession.
-      return Status.BACKOFF;
-    }
-
-    Channel channel = getChannel();
-    Transaction txn = channel.getTransaction();
-
-    txn.begin();
-
-    try {
-      long txnEventCount = 0;
-      for (; txnEventCount < batchSize; txnEventCount++) {
-        Event event = channel.take();
-        if (event == null) {
-          break;
-        }
-
-        List<Operation> operations = operationsProducer.getOperations(event);
-        for (Operation o : operations) {
-          session.apply(o);
-        }
-      }
-
-      logger.debug("Flushing {} events", txnEventCount);
-      List<OperationResponse> responses = session.flush();
-      if (responses != null) {
-        for (OperationResponse response : responses) {
-          // Throw an EventDeliveryException if at least one of the responses was
-          // a row error. Row errors can occur for example when an event is inserted
-          // into Kudu successfully but the Flume transaction is rolled back for some reason,
-          // and a subsequent replay of the same Flume transaction leads to a
-          // duplicate key error since the row already exists in Kudu.
-          // Note: Duplicate keys will not be reported as errors if ignoreDuplicateRows
-          // is enabled in the config.
-          if (response.hasRowError()) {
-            throw new EventDeliveryException("Failed to flush one or more changes. " +
-                "Transaction rolled back: " + response.getRowError().toString());
-          }
-        }
-      }
-
-      if (txnEventCount == 0) {
-        sinkCounter.incrementBatchEmptyCount();
-      } else if (txnEventCount == batchSize) {
-        sinkCounter.incrementBatchCompleteCount();
-      } else {
-        sinkCounter.incrementBatchUnderflowCount();
-      }
-
-      txn.commit();
-
-      if (txnEventCount == 0) {
-        return Status.BACKOFF;
-      }
-
-      sinkCounter.addToEventDrainSuccessCount(txnEventCount);
-      return Status.READY;
-
-    } catch (Throwable e) {
-      txn.rollback();
-
-      String msg = "Failed to commit transaction. Transaction rolled back.";
-      logger.error(msg, e);
-      if (e instanceof Error || e instanceof RuntimeException) {
-        throw new RuntimeException(e);
-      } else {
-        logger.error(msg, e);
-        throw new EventDeliveryException(msg, e);
-      }
-    } finally {
-      txn.close();
-    }
-  }
-}
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSinkConfigurationConstants.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSinkConfigurationConstants.java
deleted file mode 100644
index dbb2f66..0000000
--- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSinkConfigurationConstants.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.kudu.flume.sink;
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class KuduSinkConfigurationConstants {
-  /**
-   * Comma-separated list of "host:port" Kudu master addresses.
-   * The port is optional and defaults to the Kudu Java client's default master
-   * port.
-   */
-  public static final String MASTER_ADDRESSES = "masterAddresses";
-
-  /**
-   * The name of the table in Kudu to write to.
-   */
-  public static final String TABLE_NAME = "tableName";
-
-  /**
-   * The fully qualified class name of the KuduOperationsProducer class that the
-   * sink should use.
-   */
-  public static final String PRODUCER = "producer";
-
-  /**
-   * Prefix for configuration parameters that are passed to the
-   * KuduOperationsProducer.
-   */
-  public static final String PRODUCER_PREFIX = PRODUCER + ".";
-
-  /**
-   * Maximum number of events that the sink should take from the channel per
-   * transaction.
-   */
-  public static final String BATCH_SIZE = "batchSize";
-
-  /**
-   * Timeout period for Kudu operations, in milliseconds.
-   */
-  public static final String TIMEOUT_MILLIS = "timeoutMillis";
-
-  /**
-   * Whether to ignore duplicate primary key errors caused by inserts.
-   */
-  public static final String IGNORE_DUPLICATE_ROWS = "ignoreDuplicateRows";
-
-  /**
-   * Path to the keytab file used for authentication
-   */
-  public static final String KERBEROS_KEYTAB = "kerberosKeytab";
-
-  /**
-   * Kerberos principal used for authentication
-   */
-  public static final String KERBEROS_PRINCIPAL = "kerberosPrincipal";
-
-  /**
-   * The effective user if different from the kerberos principal
-   */
-  public static final String PROXY_USER = "proxyUser";
-}
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducer.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducer.java
deleted file mode 100644
index a3b2c57..0000000
--- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducer.java
+++ /dev/null
@@ -1,405 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.kudu.flume.sink;
-
-import java.math.BigDecimal;
-import java.nio.charset.Charset;
-import java.util.Arrays;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.regex.PatternSyntaxException;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.FlumeException;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
-import org.apache.kudu.client.Insert;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.Operation;
-import org.apache.kudu.client.PartialRow;
-import org.apache.kudu.client.Upsert;
-
-/**
- * A regular expression operations producer that generates one or more Kudu
- * {@link Insert} or {@link Upsert} operations per Flume {@link Event} by
- * parsing the event {@code body} using a regular expression. Values are
- * coerced to the types of the named columns in the Kudu table.
- *
- * <p>Example: If the Kudu table has the schema:
- *
- * <pre>
- * key INT32
- * name STRING</pre>
- *
- * <p>and {@code producer.pattern = (?<key>\\d+),(?<name>\\w+)} then
- * {@code RegexpKuduOperationsProducer} will parse the string:
- *
- * <pre>|12345,Mike||54321,Todd|</pre>
- *
- * into the rows: {@code (key=12345, name=Mike)} and {@code (key=54321, name=Todd)}.
- *
- * <p>Note: This class relies on JDK7 named capturing groups, which are
- * documented in {@link Pattern}. The name of each capturing group must
- * correspond to a column name in the destination Kudu table.
- *
- * <p><strong><code>RegexpKuduOperationsProducer</code> Flume Configuration Parameters</strong></p>
- *
- * <table cellpadding=3 cellspacing=0 border=1 summary="Flume Configuration Parameters">
- * <tr>
- *   <th>Property Name</th>
- *   <th>Default</th>
- *   <th>Required?</th>
- *   <th>Description</th>
- * </tr>
- * <tr>
- *   <td>producer.pattern</td>
- *   <td></td>
- *   <td>Yes</td>
- *   <td>The regular expression used to parse the event body.</td>
- * </tr>
- * <tr>
- *   <td>producer.charset</td>
- *   <td>utf-8</td>
- *   <td>No</td>
- *   <td>The character set of the event body.</td>
- * </tr>
- * <tr>
- *   <td>producer.operation</td>
- *   <td>upsert</td>
- *   <td>No</td>
- *   <td>Operation type used to write the event to Kudu. Must be either
- *   {@code insert} or {@code upsert}.</td>
- * </tr>
- * <tr>
- *   <td>producer.skipMissingColumn</td>
- *   <td>false</td>
- *   <td>No</td>
- *   <td>
- *   <b>@deprecated</b><br/> use {@code producer.missingColumnPolicy}
- *   What to do if a column in the Kudu table has no corresponding capture group.
- *   If set to {@code true}, a warning message is logged and the operation is still attempted.
- *   If set to {@code false}, an exception is thrown and the sink will not process the
- *   {@code Event}, causing a Flume {@code Channel} rollback.
- * </tr>
- * <tr>
- *   <td>producer.skipBadColumnValue</td>
- *   <td>false</td>
- *   <td>No</td>
- *   <td>
- *   <b>@deprecated</b><br/> use {@code producer.badColumnValuePolicy}
- *   What to do if a value in the pattern match cannot be coerced to the required type.
- *   If set to {@code true}, a warning message is logged and the operation is still attempted.
- *   If set to {@code false}, an exception is thrown and the sink will not process the
- *   {@code Event}, causing a Flume {@code Channel} rollback.
- * </tr>
- * <tr>
- *   <td>producer.warnUnmatchedRows</td>
- *   <td>true</td>
- *   <td>No</td>
- *   <td>
- *   <b>@deprecated</b><br/> use {@code producer.unmatchedRowPolicy}
- *   Whether to log a warning about payloads that do not match the pattern. If set to
- *   {@code false}, event bodies with no matches will be silently dropped.</td>
- * </tr>
- * <tr>
- *   <td>producer.missingColumnPolicy</td>
- *   <td>REJECT</td>
- *   <td>No</td>
- *   <td>What to do if a column in the Kudu table has no corresponding capture group.<br/>
- *   If set to {@code REJECT}, an exception is thrown and the sink will not process the
- *   {@code Event}, causing a Flume {@code Channel} rollback.<br/>
- *   If set to {@code WARN}, a warning message is logged and the operation is still produced.<br/>
- *   If set to {@code IGNORE}, the operation is still produced without any log message.
- * </tr>
- * <tr>
- *   <td>producer.badColumnValuePolicy</td>
- *   <td>REJECT</td>
- *   <td>No</td>
- *   <td>What to do if a value in the pattern match cannot be coerced to the required type.<br/>
- *   If set to {@code REJECT}, an exception is thrown and the sink will not process the
- *   {@code Event}, causing a Flume {@code Channel} rollback.<br/>
- *   If set to {@code WARN}, a warning message is logged and the operation is still produced,
- *   but does not include the given column.<br/>
- *   If set to {@code IGNORE}, the operation is still produced, but does not include the given
- *   column and does not log any message.
- * </tr>
- * <tr>
- *   <td>producer.unmatchedRowPolicy</td>
- *   <td>WARN</td>
- *   <td>No</td>
- *   <td>What to do if a payload does not match the pattern.<br/>
- *   If set to {@code REJECT}, an exception is thrown and the sink will not process the
- *   {@code Event}, causing a Flume {@code Channel} rollback.<br/>
- *   If set to {@code WARN}, a warning message is logged and the row is skipped,
- *   not producing an operation.<br/>
- *   If set to {@code IGNORE}, the row is skipped without any log message.
- * </tr>
- * </table>
- *
- * @see Pattern
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class RegexpKuduOperationsProducer implements KuduOperationsProducer {
-  private static final Logger logger = LoggerFactory.getLogger(RegexpKuduOperationsProducer.class);
-  private static final String INSERT = "insert";
-  private static final String UPSERT = "upsert";
-  private static final List<String> validOperations = Lists.newArrayList(UPSERT, INSERT);
-
-  public static final String PATTERN_PROP = "pattern";
-  public static final String ENCODING_PROP = "encoding";
-  public static final String DEFAULT_ENCODING = "utf-8";
-  public static final String OPERATION_PROP = "operation";
-  public static final String DEFAULT_OPERATION = UPSERT;
-  @Deprecated
-  public static final String SKIP_MISSING_COLUMN_PROP = "skipMissingColumn";
-  @Deprecated
-  public static final boolean DEFAULT_SKIP_MISSING_COLUMN = false;
-  @Deprecated
-  public static final String SKIP_BAD_COLUMN_VALUE_PROP = "skipBadColumnValue";
-  @Deprecated
-  public static final boolean DEFAULT_SKIP_BAD_COLUMN_VALUE = false;
-  @Deprecated
-  public static final String WARN_UNMATCHED_ROWS_PROP = "skipUnmatchedRows";
-  @Deprecated
-  public static final boolean DEFAULT_WARN_UNMATCHED_ROWS = true;
-  public static final String MISSING_COLUMN_POLICY_PROP = "missingColumnPolicy";
-  public static final ParseErrorPolicy DEFAULT_MISSING_COLUMN_POLICY = ParseErrorPolicy.REJECT;
-  public static final String BAD_COLUMN_VALUE_POLICY_PROP = "badColumnValuePolicy";
-  public static final ParseErrorPolicy DEFAULT_BAD_COLUMN_VALUE_POLICY = ParseErrorPolicy.REJECT;
-  public static final String UNMATCHED_ROW_POLICY_PROP = "unmatchedRowPolicy";
-  public static final ParseErrorPolicy DEFAULT_UNMATCHED_ROW_POLICY = ParseErrorPolicy.WARN;
-
-  private KuduTable table;
-  private Pattern pattern;
-  private Charset charset;
-  private String operation;
-  private ParseErrorPolicy missingColumnPolicy;
-  private ParseErrorPolicy badColumnValuePolicy;
-  private ParseErrorPolicy unmatchedRowPolicy;
-
-  public RegexpKuduOperationsProducer() {
-  }
-
-  @Override
-  public void configure(Context context) {
-    String regexp = context.getString(PATTERN_PROP);
-    Preconditions.checkArgument(regexp != null,
-        "Required parameter %s is not specified",
-        PATTERN_PROP);
-    try {
-      pattern = Pattern.compile(regexp);
-    } catch (PatternSyntaxException e) {
-      throw new IllegalArgumentException(
-          String.format("The pattern '%s' is invalid", regexp), e);
-    }
-    String charsetName = context.getString(ENCODING_PROP, DEFAULT_ENCODING);
-    try {
-      charset = Charset.forName(charsetName);
-    } catch (IllegalArgumentException e) {
-      throw new FlumeException(
-          String.format("Invalid or unsupported charset %s", charsetName), e);
-    }
-    operation = context.getString(OPERATION_PROP, DEFAULT_OPERATION).toLowerCase();
-    Preconditions.checkArgument(
-        validOperations.contains(operation),
-        "Unrecognized operation '%s'",
-        operation);
-
-
-    missingColumnPolicy = getParseErrorPolicyCheckingDeprecatedProperty(
-      context, SKIP_MISSING_COLUMN_PROP, MISSING_COLUMN_POLICY_PROP,
-      ParseErrorPolicy.WARN, ParseErrorPolicy.REJECT, DEFAULT_MISSING_COLUMN_POLICY
-    );
-
-    badColumnValuePolicy = getParseErrorPolicyCheckingDeprecatedProperty(
-      context, SKIP_BAD_COLUMN_VALUE_PROP, BAD_COLUMN_VALUE_POLICY_PROP,
-      ParseErrorPolicy.WARN, ParseErrorPolicy.REJECT, DEFAULT_BAD_COLUMN_VALUE_POLICY
-    );
-
-    unmatchedRowPolicy = getParseErrorPolicyCheckingDeprecatedProperty(
-      context, WARN_UNMATCHED_ROWS_PROP, UNMATCHED_ROW_POLICY_PROP,
-      ParseErrorPolicy.WARN, ParseErrorPolicy.IGNORE, DEFAULT_UNMATCHED_ROW_POLICY
-    );
-  }
-
-  @Override
-  public void initialize(KuduTable table) {
-    this.table = table;
-  }
-
-  @Override
-  public List<Operation> getOperations(Event event) throws FlumeException {
-    String raw = new String(event.getBody(), charset);
-    Matcher m = pattern.matcher(raw);
-    boolean match = false;
-    Schema schema = table.getSchema();
-    List<Operation> ops = Lists.newArrayList();
-    while (m.find()) {
-      match = true;
-      Operation op;
-      switch (operation) {
-        case UPSERT:
-          op = table.newUpsert();
-          break;
-        case INSERT:
-          op = table.newInsert();
-          break;
-        default:
-          throw new FlumeException(
-              String.format("Unrecognized operation type '%s' in getOperations(): " +
-                  "this should never happen!", operation));
-      }
-      PartialRow row = op.getRow();
-      for (ColumnSchema col : schema.getColumns()) {
-        try {
-          coerceAndSet(m.group(col.getName()), col.getName(), col.getType(), row);
-        } catch (NumberFormatException e) {
-          String msg = String.format(
-              "Raw value '%s' couldn't be parsed to type %s for column '%s'",
-              raw, col.getType(), col.getName());
-          logOrThrow(badColumnValuePolicy, msg, e);
-        } catch (IllegalArgumentException e) {
-          String msg = String.format(
-              "Column '%s' has no matching group in '%s'",
-              col.getName(), raw);
-          logOrThrow(missingColumnPolicy, msg, e);
-        } catch (Exception e) {
-          throw new FlumeException("Failed to create Kudu operation", e);
-        }
-      }
-      ops.add(op);
-    }
-    if (!match) {
-      String msg = String.format("Failed to match the pattern '%s' in '%s'", pattern, raw);
-      logOrThrow(unmatchedRowPolicy, msg, null);
-    }
-    return ops;
-  }
-
-  /**
-   * Coerces the string `rawVal` to the type `type` and sets the resulting
-   * value for column `colName` in `row`.
-   *
-   * @param rawVal the raw string column value
-   * @param colName the name of the column
-   * @param type the Kudu type to convert `rawVal` to
-   * @param row the row to set the value in
-   * @throws NumberFormatException if `rawVal` cannot be cast as `type`.
-   */
-  private void coerceAndSet(String rawVal, String colName, Type type, PartialRow row)
-      throws NumberFormatException {
-    switch (type) {
-      case BOOL:
-        row.addBoolean(colName, Boolean.parseBoolean(rawVal));
-        break;
-      case INT8:
-        row.addByte(colName, Byte.parseByte(rawVal));
-        break;
-      case INT16:
-        row.addShort(colName, Short.parseShort(rawVal));
-        break;
-      case INT32:
-        row.addInt(colName, Integer.parseInt(rawVal));
-        break;
-      case INT64: // Fall through
-      case UNIXTIME_MICROS:
-        row.addLong(colName, Long.parseLong(rawVal));
-        break;
-      case FLOAT:
-        row.addFloat(colName, Float.parseFloat(rawVal));
-        break;
-      case DOUBLE:
-        row.addDouble(colName, Double.parseDouble(rawVal));
-        break;
-      case DECIMAL:
-        row.addDecimal(colName, new BigDecimal(rawVal));
-        break;
-      case BINARY:
-        row.addBinary(colName, rawVal.getBytes(charset));
-        break;
-      case STRING:
-        row.addString(colName, rawVal);
-        break;
-      default:
-        logger.warn("got unknown type {} for column '{}'-- ignoring this column",
-            type, colName);
-    }
-  }
-
-  private void logOrThrow(ParseErrorPolicy policy, String msg, Exception e)
-      throws FlumeException {
-    switch (policy) {
-      case REJECT:
-        throw new FlumeException(msg, e);
-      case WARN:
-        logger.warn(msg, e);
-        break;
-      case IGNORE:
-        // Fall through
-      default:
-    }
-  }
-
-  @Override
-  public void close() {
-  }
-
-  private ParseErrorPolicy getParseErrorPolicyCheckingDeprecatedProperty(
-      Context context, String deprecatedPropertyName, String newPropertyName,
-      ParseErrorPolicy trueValue, ParseErrorPolicy falseValue, ParseErrorPolicy defaultValue) {
-    ParseErrorPolicy policy;
-    if (context.containsKey(deprecatedPropertyName)) {
-      logger.info("Configuration property {} is deprecated. Use {} instead.",
-          deprecatedPropertyName, newPropertyName);
-      Preconditions.checkArgument(!context.containsKey(newPropertyName),
-          "Both {} and {} specified. Use only one of them, preferably {}.",
-          deprecatedPropertyName, newPropertyName, newPropertyName);
-      policy = context.getBoolean(deprecatedPropertyName) ? trueValue : falseValue;
-    } else {
-      String policyString = context.getString(newPropertyName, defaultValue.name());
-      try {
-        policy = ParseErrorPolicy.valueOf(policyString.toUpperCase());
-      } catch (IllegalArgumentException e) {
-        throw new IllegalArgumentException(
-          "Unknown policy '" + policyString + "'. Use one of the following: " +
-              Arrays.toString(ParseErrorPolicy.values()), e);
-      }
-    }
-
-    return policy;
-  }
-
-  private enum ParseErrorPolicy {
-    WARN,
-    IGNORE,
-    REJECT
-  }
-}
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKeyedKuduOperationsProducer.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKeyedKuduOperationsProducer.java
deleted file mode 100644
index b421771..0000000
--- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKeyedKuduOperationsProducer.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.kudu.flume.sink;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.FlumeException;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-
-import org.apache.kudu.client.Insert;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.Operation;
-import org.apache.kudu.client.PartialRow;
-import org.apache.kudu.client.Upsert;
-
-/**
- * A simple serializer that generates one {@link Insert} or {@link Upsert}
- * per {@link Event} by writing the event body into a BINARY column. The pair
- * (key column name, key column value) should be a header in the {@link Event};
- * the column name is configurable but the column type must be STRING. Multiple
- * key columns are not supported.
- *
- * <p><strong>Simple Keyed Kudu Operations Producer configuration parameters</strong>
- *
- * <table cellpadding=3 cellspacing=0 border=1 summary="Simple Keyed Kudu Operations Producer configuration parameters">
- * <tr>
- *   <th>Property Name</th>
- *   <th>Default</th>
- *   <th>Required?</th>
- *   <th>Description</th>
- * </tr>
- * <tr>
- *   <td>producer.payloadColumn</td>
- *   <td>payload</td>
- *   <td>No</td>
- *   <td>The name of the BINARY column to write the Flume event body to.</td>
- * </tr>
- * <tr>
- *   <td>producer.keyColumn</td>
- *   <td>key</td>
- *   <td>No</td>
- *   <td>The name of the STRING key column of the target Kudu table.</td>
- * </tr>
- * <tr>
- *   <td>producer.operation</td>
- *   <td>upsert</td>
- *   <td>No</td>
- *   <td>The operation used to write events to Kudu. Supported operations
- *   are 'insert' and 'upsert'</td>
- * </tr>
- * </table>
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class SimpleKeyedKuduOperationsProducer implements KuduOperationsProducer {
-  public static final String PAYLOAD_COLUMN_PROP = "payloadColumn";
-  public static final String PAYLOAD_COLUMN_DEFAULT = "payload";
-  public static final String KEY_COLUMN_PROP = "keyColumn";
-  public static final String KEY_COLUMN_DEFAULT = "key";
-  public static final String OPERATION_PROP = "operation";
-  public static final String OPERATION_DEFAULT = "upsert";
-
-  private KuduTable table;
-  private String payloadColumn;
-  private String keyColumn;
-  private String operation;
-
-  public SimpleKeyedKuduOperationsProducer(){
-  }
-
-  @Override
-  public void configure(Context context) {
-    payloadColumn = context.getString(PAYLOAD_COLUMN_PROP, PAYLOAD_COLUMN_DEFAULT);
-    keyColumn = context.getString(KEY_COLUMN_PROP, KEY_COLUMN_DEFAULT);
-    operation = context.getString(OPERATION_PROP, OPERATION_DEFAULT);
-  }
-
-  @Override
-  public void initialize(KuduTable table) {
-    this.table = table;
-  }
-
-  @Override
-  public List<Operation> getOperations(Event event) throws FlumeException {
-    String key = event.getHeaders().get(keyColumn);
-    if (key == null) {
-      throw new FlumeException(
-          String.format("No value provided for key column %s", keyColumn));
-    }
-    try {
-      Operation op;
-      switch (operation.toLowerCase()) {
-        case "upsert":
-          op = table.newUpsert();
-          break;
-        case "insert":
-          op = table.newInsert();
-          break;
-        default:
-          throw new FlumeException(
-              String.format("Unexpected operation %s", operation));
-      }
-      PartialRow row = op.getRow();
-      row.addString(keyColumn, key);
-      row.addBinary(payloadColumn, event.getBody());
-
-      return Collections.singletonList(op);
-    } catch (Exception e) {
-      throw new FlumeException("Failed to create Kudu Operation object", e);
-    }
-  }
-
-  @Override
-  public void close() {
-  }
-}
-
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKuduOperationsProducer.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKuduOperationsProducer.java
deleted file mode 100644
index acd74e1..0000000
--- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKuduOperationsProducer.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.kudu.flume.sink;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.FlumeException;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-
-import org.apache.kudu.client.Insert;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.Operation;
-import org.apache.kudu.client.PartialRow;
-
-/**
- * A simple serializer that generates one {@link Insert} per {@link Event}
- * by writing the event body into a BINARY column. The headers are discarded.
- *
- * <p><strong>Simple Kudu Event Producer configuration parameters</strong>
- *
- * <table cellpadding=3 cellspacing=0 border=1 summary="Simple Kudu Event Producer configuration parameters">
- * <tr>
- *   <th>Property Name</th>
- *   <th>Default</th>
- *   <th>Required?</th>
- *   <th>Description</th>
- * </tr>
- * <tr>
- *   <td>producer.payloadColumn</td>
- *   <td>payload</td>
- *   <td>No</td>
- *   <td>The name of the BINARY column to write the Flume the event body to.</td>
- * </tr>
- * </table>
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class SimpleKuduOperationsProducer implements KuduOperationsProducer {
-  public static final String PAYLOAD_COLUMN_PROP = "payloadColumn";
-  public static final String PAYLOAD_COLUMN_DEFAULT = "payload";
-
-  private KuduTable table;
-  private String payloadColumn;
-
-  public SimpleKuduOperationsProducer() {
-  }
-
-  @Override
-  public void configure(Context context) {
-    payloadColumn = context.getString(PAYLOAD_COLUMN_PROP, PAYLOAD_COLUMN_DEFAULT);
-  }
-
-  @Override
-  public void initialize(KuduTable table) {
-    this.table = table;
-  }
-
-  @Override
-  public List<Operation> getOperations(Event event) throws FlumeException {
-    try {
-      Insert insert = table.newInsert();
-      PartialRow row = insert.getRow();
-      row.addBinary(payloadColumn, event.getBody());
-
-      return Collections.singletonList((Operation) insert);
-    } catch (Exception e) {
-      throw new FlumeException("Failed to create Kudu Insert object", e);
-    }
-  }
-
-  @Override
-  public void close() {
-  }
-}
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java
deleted file mode 100644
index 5517f1a..0000000
--- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.kudu.flume.sink;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.kudu.flume.sink.AvroKuduOperationsProducer.SCHEMA_LITERAL_HEADER;
-import static org.apache.kudu.flume.sink.AvroKuduOperationsProducer.SCHEMA_PROP;
-import static org.apache.kudu.flume.sink.AvroKuduOperationsProducer.SCHEMA_URL_HEADER;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER_PREFIX;
-import static org.apache.kudu.test.ClientTestUtil.scanTableToStrings;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.math.BigDecimal;
-import java.net.URL;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.io.Encoder;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.event.EventBuilder;
-import org.junit.Rule;
-import org.junit.Test;
-
-import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
-import org.apache.kudu.client.CreateTableOptions;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.test.KuduTestHarness;
-import org.apache.kudu.util.DecimalUtil;
-
-public class AvroKuduOperationsProducerTest {
-  private static String schemaUriString;
-  private static String schemaLiteral;
-
-  static {
-    try {
-      String schemaPath = "/testAvroKuduOperationsProducer.avsc";
-      URL schemaUrl = AvroKuduOperationsProducerTest.class.getResource(schemaPath);
-      File schemaFile = Paths.get(schemaUrl.toURI()).toFile();
-      schemaUriString = schemaFile.getAbsoluteFile().toURI().toString();
-      schemaLiteral = Files.asCharSource(schemaFile, UTF_8).read();
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  enum SchemaLocation {
-    GLOBAL, URL, LITERAL
-  }
-
-  @Rule
-  public KuduTestHarness harness = new KuduTestHarness();
-
-  @Test
-  public void testEmptyChannel() throws Exception {
-    testEvents(0, SchemaLocation.GLOBAL);
-  }
-
-  @Test
-  public void testOneEvent() throws Exception {
-    testEvents(1, SchemaLocation.GLOBAL);
-  }
-
-  @Test
-  public void testThreeEvents() throws Exception {
-    testEvents(3, SchemaLocation.GLOBAL);
-  }
-
-  @Test
-  public void testThreeEventsSchemaURLInEvent() throws Exception {
-    testEvents(3, SchemaLocation.URL);
-  }
-
-  @Test
-  public void testThreeEventsSchemaLiteralInEvent() throws Exception {
-    testEvents(3, SchemaLocation.LITERAL);
-  }
-
-  private void testEvents(int eventCount, SchemaLocation schemaLocation)
-      throws Exception {
-    KuduTable table = createNewTable(
-        String.format("test%sevents%s", eventCount, schemaLocation));
-    String tableName = table.getName();
-    Context context = schemaLocation != SchemaLocation.GLOBAL ? new Context()
-        : new Context(ImmutableMap.of(PRODUCER_PREFIX + SCHEMA_PROP, schemaUriString));
-    context.put(PRODUCER, AvroKuduOperationsProducer.class.getName());
-
-    List<Event> events = generateEvents(eventCount, schemaLocation);
-
-    KuduSinkTestUtil.processEventsCreatingSink(harness.getClient(), context, tableName, events);
-
-    List<String> answers = makeAnswers(eventCount);
-    List<String> rows = scanTableToStrings(table);
-    assertEquals("wrong number of rows inserted", answers.size(), rows.size());
-    assertArrayEquals("wrong rows inserted", answers.toArray(), rows.toArray());
-  }
-
-  private KuduTable createNewTable(String tableName) throws Exception {
-    List<ColumnSchema> columns = new ArrayList<>(5);
-    columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build());
-    columns.add(new ColumnSchema.ColumnSchemaBuilder("longField", Type.INT64).build());
-    columns.add(new ColumnSchema.ColumnSchemaBuilder("doubleField", Type.DOUBLE).build());
-    columns.add(new ColumnSchema.ColumnSchemaBuilder("nullableField", Type.STRING)
-        .nullable(true).build());
-    columns.add(new ColumnSchema.ColumnSchemaBuilder("stringField", Type.STRING).build());
-    columns.add(new ColumnSchema.ColumnSchemaBuilder("decimalField", Type.DECIMAL)
-        .typeAttributes(DecimalUtil.typeAttributes(9, 1)).build());
-    CreateTableOptions createOptions =
-        new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("key"))
-            .setNumReplicas(1);
-    return harness.getClient().createTable(tableName, new Schema(columns), createOptions);
-  }
-
-  private List<Event> generateEvents(int eventCount,
-                                     SchemaLocation schemaLocation) throws Exception {
-    List<Event> events = new ArrayList<>();
-    for (int i = 0; i < eventCount; i++) {
-      AvroKuduOperationsProducerTestRecord record = new AvroKuduOperationsProducerTestRecord();
-      record.setKey(10 * i);
-      record.setLongField(2L * i);
-      record.setDoubleField(2.71828 * i);
-      record.setNullableField(i % 2 == 0 ? null : "taco");
-      record.setStringField(String.format("hello %d", i));
-      record.setDecimalField(BigDecimal.valueOf(i, 1));
-      ByteArrayOutputStream out = new ByteArrayOutputStream();
-      Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
-      DatumWriter<AvroKuduOperationsProducerTestRecord> writer =
-          new SpecificDatumWriter<>(AvroKuduOperationsProducerTestRecord.class);
-      writer.write(record, encoder);
-      encoder.flush();
-      Event e = EventBuilder.withBody(out.toByteArray());
-      if (schemaLocation == SchemaLocation.URL) {
-        e.setHeaders(ImmutableMap.of(SCHEMA_URL_HEADER, schemaUriString));
-      } else if (schemaLocation == SchemaLocation.LITERAL) {
-        e.setHeaders(ImmutableMap.of(SCHEMA_LITERAL_HEADER, schemaLiteral));
-      }
-      events.add(e);
-    }
-    return events;
-  }
-
-  private List<String> makeAnswers(int eventCount) {
-    List<String> answers = Lists.newArrayList();
-    for (int i = 0; i < eventCount; i++) {
-      answers.add(String.format(
-          "INT32 key=%s, INT64 longField=%s, DOUBLE doubleField=%s, " +
-              "STRING nullableField=%s, STRING stringField=hello %s, " +
-              "DECIMAL decimalField(9, 1)=%s",
-          10 * i,
-          2 * i,
-          2.71828 * i,
-          i % 2 == 0 ? "NULL" : "taco",
-          i,
-          BigDecimal.valueOf(i, 1)));
-    }
-    Collections.sort(answers);
-    return answers;
-  }
-}
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduOperationsProducerTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduOperationsProducerTest.java
deleted file mode 100644
index 58200fc..0000000
--- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduOperationsProducerTest.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.kudu.flume.sink;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER_PREFIX;
-import static org.apache.kudu.flume.sink.SimpleKeyedKuduOperationsProducer.KEY_COLUMN_DEFAULT;
-import static org.apache.kudu.flume.sink.SimpleKeyedKuduOperationsProducer.OPERATION_PROP;
-import static org.apache.kudu.flume.sink.SimpleKeyedKuduOperationsProducer.PAYLOAD_COLUMN_DEFAULT;
-import static org.apache.kudu.test.ClientTestUtil.scanTableToStrings;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.event.EventBuilder;
-import org.junit.Rule;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
-import org.apache.kudu.client.CreateTableOptions;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.test.KuduTestHarness;
-
-public class KeyedKuduOperationsProducerTest {
-  private static final Logger LOG = LoggerFactory.getLogger(KeyedKuduOperationsProducerTest.class);
-
-  @Rule
-  public KuduTestHarness harness = new KuduTestHarness();
-
-  private KuduTable createNewTable(String tableName) throws Exception {
-    LOG.info("Creating new table...");
-
-    ArrayList<ColumnSchema> columns = new ArrayList<>(2);
-    columns.add(
-        new ColumnSchema.ColumnSchemaBuilder(KEY_COLUMN_DEFAULT, Type.STRING)
-            .key(true).build());
-    columns.add(
-        new ColumnSchema.ColumnSchemaBuilder(PAYLOAD_COLUMN_DEFAULT, Type.BINARY)
-            .key(false).build());
-    CreateTableOptions createOptions =
-        new CreateTableOptions().setRangePartitionColumns(ImmutableList.of(KEY_COLUMN_DEFAULT))
-                              .setNumReplicas(1);
-    KuduTable table =
-        harness.getClient().createTable(tableName, new Schema(columns), createOptions);
-
-    LOG.info("Created new table.");
-
-    return table;
-  }
-
-  @Test
-  public void testEmptyChannelWithInsert() throws Exception {
-    testEvents(0, "insert");
-  }
-
-  @Test
-  public void testOneEventWithInsert() throws Exception {
-    testEvents(1, "insert");
-  }
-
-  @Test
-  public void testThreeEventsWithInsert() throws Exception {
-    testEvents(3, "insert");
-  }
-
-  @Test
-  public void testEmptyChannelWithUpsert() throws Exception {
-    testEvents(0, "upsert");
-  }
-
-  @Test
-  public void testOneEventWithUpsert() throws Exception {
-    testEvents(1, "upsert");
-  }
-
-  @Test
-  public void testThreeEventsWithUpsert() throws Exception {
-    testEvents(3, "upsert");
-  }
-
-  @Test
-  public void testDuplicateRowsWithUpsert() throws Exception {
-    LOG.info("Testing events with upsert...");
-
-    KuduTable table = createNewTable("testDupUpsertEvents");
-    String tableName = table.getName();
-    Context ctx = new Context(ImmutableMap.of(
-        PRODUCER_PREFIX + OPERATION_PROP, "upsert",
-        PRODUCER, SimpleKeyedKuduOperationsProducer.class.getName()
-    ));
-    KuduSink sink = KuduSinkTestUtil.createSink(harness.getClient(), tableName, ctx);
-    sink.start();
-
-    int numRows = 3;
-    List<Event> events = new ArrayList<>();
-    for (int i = 0; i < numRows; i++) {
-      Event e = EventBuilder.withBody(String.format("payload body %s", i), UTF_8);
-      e.setHeaders(ImmutableMap.of(KEY_COLUMN_DEFAULT, String.format("key %s", i)));
-      events.add(e);
-    }
-
-    KuduSinkTestUtil.processEvents(sink, events);
-
-    List<String> rows = scanTableToStrings(table);
-    assertEquals(numRows + " row(s) expected", numRows, rows.size());
-
-    for (int i = 0; i < numRows; i++) {
-      assertTrue("incorrect payload", rows.get(i).contains("payload body " + i));
-    }
-
-    Event dup = EventBuilder.withBody("payload body upserted".getBytes(UTF_8));
-    dup.setHeaders(ImmutableMap.of("key", String.format("key %s", 0)));
-
-    KuduSinkTestUtil.processEvents(sink, ImmutableList.of(dup));
-
-    List<String> upRows = scanTableToStrings(table);
-    assertEquals(numRows + " row(s) expected", numRows, upRows.size());
-
-    assertTrue("incorrect payload", upRows.get(0).contains("payload body upserted"));
-    for (int i = 1; i < numRows; i++) {
-      assertTrue("incorrect payload", upRows.get(i).contains("payload body " + i));
-    }
-
-    LOG.info("Testing events with upsert finished successfully.");
-  }
-
-  private void testEvents(int eventCount, String operation) throws Exception {
-    LOG.info("Testing {} events...", eventCount);
-
-    KuduTable table = createNewTable("test" + eventCount + "events" + operation);
-    String tableName = table.getName();
-    Context context = new Context(ImmutableMap.of(
-        PRODUCER_PREFIX + OPERATION_PROP, operation,
-        PRODUCER, SimpleKeyedKuduOperationsProducer.class.getName()
-    ));
-
-    List<Event> events = getEvents(eventCount);
-
-    KuduSinkTestUtil.processEventsCreatingSink(harness.getClient(), context, tableName, events);
-
-    List<String> rows = scanTableToStrings(table);
-    assertEquals(eventCount + " row(s) expected", eventCount, rows.size());
-
-    for (int i = 0; i < eventCount; i++) {
-      assertTrue("incorrect payload", rows.get(i).contains("payload body " + i));
-    }
-
-    LOG.info("Testing {} events finished successfully.", eventCount);
-  }
-
-  private List<Event> getEvents(int eventCount) {
-    List<Event> events = new ArrayList<>();
-    for (int i = 0; i < eventCount; i++) {
-      Event e = EventBuilder.withBody(String.format("payload body %s", i).getBytes(UTF_8));
-      e.setHeaders(ImmutableMap.of("key", String.format("key %s", i)));
-      events.add(e);
-    }
-    return events;
-  }
-}
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java
deleted file mode 100644
index 89b8c1a..0000000
--- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.kudu.flume.sink;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.kudu.test.ClientTestUtil.scanTableToStrings;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.FlumeException;
-import org.apache.flume.Sink;
-import org.apache.flume.Sink.Status;
-import org.apache.flume.Transaction;
-import org.apache.flume.conf.Configurables;
-import org.apache.flume.event.EventBuilder;
-import org.apache.kudu.test.KuduTestHarness;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
-import org.apache.kudu.client.CreateTableOptions;
-import org.apache.kudu.client.KuduTable;
-
-public class KuduSinkTest {
-  private static final Logger LOG = LoggerFactory.getLogger(KuduSinkTest.class);
-
-  @Rule
-  public KuduTestHarness harness = new KuduTestHarness();
-
-  private KuduTable createNewTable(String tableName) throws Exception {
-    LOG.info("Creating new table...");
-
-    ArrayList<ColumnSchema> columns = new ArrayList<>(1);
-    columns.add(new ColumnSchema.ColumnSchemaBuilder("payload", Type.BINARY).key(true).build());
-    CreateTableOptions createOptions =
-        new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("payload"))
-                                .setNumReplicas(1);
-    KuduTable table = harness.getClient().createTable(tableName, new Schema(columns), createOptions);
-
-    LOG.info("Created new table.");
-
-    return table;
-  }
-
-  @Test
-  public void testMandatoryParameters() {
-    LOG.info("Testing mandatory parameters...");
-
-    KuduSink sink = new KuduSink(harness.getClient());
-
-    HashMap<String, String> parameters = new HashMap<>();
-    Context context = new Context(parameters);
-    try {
-      Configurables.configure(sink, context);
-      Assert.fail("Should have failed due to missing properties");
-    } catch (NullPointerException npe) {
-        //good
-    }
-
-    parameters.put(KuduSinkConfigurationConstants.TABLE_NAME, "tableName");
-    context = new Context(parameters);
-    try {
-      Configurables.configure(sink, context);
-      Assert.fail("Should have failed due to missing properties");
-    } catch (NullPointerException npe) {
-        //good
-    }
-
-    LOG.info("Testing mandatory parameters finished successfully.");
-  }
-
-  @Test(expected = FlumeException.class)
-  public void testMissingTable() {
-    LOG.info("Testing missing table...");
-
-    KuduSink sink = KuduSinkTestUtil.createSink(harness.getClient(), "missingTable", new Context());
-    sink.start();
-
-    LOG.info("Testing missing table finished successfully.");
-  }
-
-  @Test
-  public void testEmptyChannelWithDefaults() throws Exception {
-    testEventsWithDefaults(0);
-  }
-
-  @Test
-  public void testOneEventWithDefaults() throws Exception {
-    testEventsWithDefaults(1);
-  }
-
-  @Test
-  public void testThreeEventsWithDefaults() throws Exception {
-    testEventsWithDefaults(3);
-  }
-
-  @Test
-  public void testDuplicateRowsWithDuplicatesIgnored() throws Exception {
-    doTestDuplicateRows(true);
-  }
-
-  @Test
-  public void testDuplicateRowsWithDuplicatesNotIgnored() throws Exception {
-    doTestDuplicateRows(false);
-  }
-
-  private void doTestDuplicateRows(boolean ignoreDuplicateRows) throws Exception {
-    KuduTable table = createNewTable("testDuplicateRows" + ignoreDuplicateRows);
-    String tableName = table.getName();
-    Context sinkContext = new Context();
-    sinkContext.put(KuduSinkConfigurationConstants.IGNORE_DUPLICATE_ROWS,
-                    Boolean.toString(ignoreDuplicateRows));
-    KuduSink sink = KuduSinkTestUtil.createSink(harness.getClient(), tableName, sinkContext);
-    sink.start();
-    Channel channel = sink.getChannel();
-
-    Transaction tx = channel.getTransaction();
-    tx.begin();
-
-    for (int i = 0; i < 2; i++) {
-      Event e = EventBuilder.withBody("key-0", UTF_8); // Duplicate keys.
-      channel.put(e);
-    }
-
-    tx.commit();
-    tx.close();
-
-    try {
-      Sink.Status status = sink.process();
-      if (!ignoreDuplicateRows) {
-        fail("Incorrectly ignored duplicate rows!");
-      }
-      assertSame("incorrect status for empty channel", status, Status.READY);
-    } catch (EventDeliveryException e) {
-      if (ignoreDuplicateRows) {
-        throw new AssertionError("Failed to ignore duplicate rows!", e);
-      } else {
-        LOG.info("Correctly did not ignore duplicate rows", e);
-        return;
-      }
-    }
-
-    // We only get here if the process() succeeded.
-    try {
-      List<String> rows = scanTableToStrings(table);
-      assertEquals("1 row expected", 1, rows.size());
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-
-    LOG.info("Testing duplicate events finished successfully.");
-  }
-
-  private void testEventsWithDefaults(int eventCount) throws Exception {
-    LOG.info("Testing {} events...", eventCount);
-
-    KuduTable table = createNewTable("test" + eventCount + "events");
-    String tableName = table.getName();
-
-    List<Event> events = new ArrayList<>();
-
-    for (int i = 0; i < eventCount; i++) {
-      Event e = EventBuilder.withBody(String.format("payload body %s", i).getBytes(UTF_8));
-      events.add(e);
-    }
-
-    KuduSinkTestUtil.processEventsCreatingSink(harness.getClient(), new Context(), tableName, events);
-
-    List<String> rows = scanTableToStrings(table);
-    assertEquals(eventCount + " row(s) expected", eventCount, rows.size());
-
-    for (int i = 0; i < eventCount; i++) {
-      assertTrue("incorrect payload", rows.get(i).contains("payload body " + i));
-    }
-
-    LOG.info("Testing {} events finished successfully.", eventCount);
-  }
-}
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTestUtil.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTestUtil.java
deleted file mode 100644
index 3f5cfbf..0000000
--- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTestUtil.java
+++ /dev/null
@@ -1,102 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.kudu.flume.sink;
-
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.KERBEROS_KEYTAB;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.KERBEROS_PRINCIPAL;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.MASTER_ADDRESSES;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.TABLE_NAME;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertSame;
-
-import java.util.List;
-
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.Sink.Status;
-import org.apache.flume.Transaction;
-import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.conf.Configurables;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.kudu.client.KuduClient;
-
-class KuduSinkTestUtil {
-  private static final Logger LOG = LoggerFactory.getLogger(KuduSinkTestUtil.class);
-
-  static KuduSink createSink(KuduClient client, String tableName, Context ctx) {
-    return createSink(tableName, client, ctx, client.getMasterAddressesAsString());
-  }
-
-  private static KuduSink createSink(
-      String tableName, KuduClient client, Context ctx, String masterAddresses) {
-    LOG.info("Creating Kudu sink for '{}' table...", tableName);
-
-    Context context = new Context();
-    context.put(TABLE_NAME, tableName);
-    context.put(MASTER_ADDRESSES, masterAddresses);
-    context.putAll(ctx.getParameters());
-    KuduSink sink = new KuduSink(client);
-    Configurables.configure(sink, context);
-    Channel channel = new MemoryChannel();
-    Configurables.configure(channel, new Context());
-    sink.setChannel(channel);
-
-    LOG.info("Created Kudu sink for '{}' table.", tableName);
-
-    return sink;
-  }
-
-  static KuduSink createSecureSink(String tableName, String masterAddresses, String clusterRoot) {
-    Context context = new Context();
-    context.put(KERBEROS_KEYTAB, clusterRoot + "/krb5kdc/test-user.keytab");
-    context.put(KERBEROS_PRINCIPAL, "test-user@KRBTEST.COM");
-
-    return createSink(tableName, null, context, masterAddresses);
-  }
-
-  static void processEventsCreatingSink(
-      KuduClient syncClient, Context context, String tableName, List<Event> events
-      ) throws EventDeliveryException {
-    KuduSink sink = createSink(syncClient, tableName, context);
-    sink.start();
-    processEvents(sink, events);
-  }
-
-  static void processEvents(KuduSink sink, List<Event> events) throws EventDeliveryException {
-    Channel channel = sink.getChannel();
-    Transaction tx = channel.getTransaction();
-    tx.begin();
-    for (Event e : events) {
-      channel.put(e);
-    }
-    tx.commit();
-    tx.close();
-
-    Status status = sink.process();
-    if (events.isEmpty()) {
-      assertSame("incorrect status for empty channel", status, Status.BACKOFF);
-    } else {
-      assertNotSame("incorrect status for non-empty channel", status, Status.BACKOFF);
-    }
-  }
-}
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerParseErrorTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerParseErrorTest.java
deleted file mode 100644
index 37e33c2..0000000
--- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerParseErrorTest.java
+++ /dev/null
@@ -1,297 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.kudu.flume.sink;
-
-import static org.apache.kudu.flume.sink.RegexpKuduOperationsProducer.BAD_COLUMN_VALUE_POLICY_PROP;
-import static org.apache.kudu.flume.sink.RegexpKuduOperationsProducer.MISSING_COLUMN_POLICY_PROP;
-import static org.apache.kudu.flume.sink.RegexpKuduOperationsProducer.OPERATION_PROP;
-import static org.apache.kudu.flume.sink.RegexpKuduOperationsProducer.PATTERN_PROP;
-import static org.apache.kudu.flume.sink.RegexpKuduOperationsProducer.SKIP_BAD_COLUMN_VALUE_PROP;
-import static org.apache.kudu.flume.sink.RegexpKuduOperationsProducer.SKIP_MISSING_COLUMN_PROP;
-import static org.apache.kudu.flume.sink.RegexpKuduOperationsProducer.UNMATCHED_ROW_POLICY_PROP;
-import static org.apache.kudu.flume.sink.RegexpKuduOperationsProducer.WARN_UNMATCHED_ROWS_PROP;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.Closeable;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.flume.Context;
-import org.apache.flume.FlumeException;
-import org.apache.flume.event.EventBuilder;
-import org.apache.kudu.test.KuduTestHarness;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.rules.RuleChain;
-
-import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
-import org.apache.kudu.client.CreateTableOptions;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.test.CapturingLogAppender;
-
-public class RegexpKuduOperationsProducerParseErrorTest {
-  private static final String TEST_REGEXP = "(?<key>\\d+),(?<byteFld>\\d+),(?<stringFld>\\w+)";
-  private static final String TEST_REGEXP_MISSING_COLUMN = "(?<key>\\d+),(?<byteFld>\\d+)";
-  private static final String TEST_OPERATION = "insert";
-
-  private static final String ROW_UNMATCHING = "invalid row";
-  private static final String ROW_BAD_COLUMN_VALUE = "1,1000,string";
-  private static final String ROW_MISSING_COLUMN = "1,1";
-
-  private static final String ERROR_MSG_UNMATCHED_ROW =
-      "Failed to match the pattern '" + TEST_REGEXP + "' in '" + ROW_UNMATCHING + "'";
-  private static final String ERROR_MSG_MISSING_COLUMN =
-      "Column 'stringFld' has no matching group in '" + ROW_MISSING_COLUMN + "'";
-  private static final String ERROR_MSG_BAD_COLUMN_VALUE =
-      "Raw value '" + ROW_BAD_COLUMN_VALUE +
-        "' couldn't be parsed to type Type: int8 for column 'byteFld'";
-
-  private static final String POLICY_REJECT = "REJECT";
-  private static final String POLICY_WARN = "WARN";
-  private static final String POLICY_IGNORE = "IGNORE";
-
-  public KuduTestHarness harness = new KuduTestHarness();
-  public ExpectedException thrown = ExpectedException.none();
-
-  // ExpectedException misbehaves when combined with other rules; we use a
-  // RuleChain to beat it into submission.
-  //
-  // See https://stackoverflow.com/q/28846088 for more information.
-  @Rule
-  public RuleChain chain = RuleChain.outerRule(harness).around(thrown);
-
-  @Test
-  public void testMissingColumnThrowsExceptionDefaultConfig() throws Exception {
-    Context additionalContext = new Context();
-    additionalContext.put(PATTERN_PROP, TEST_REGEXP_MISSING_COLUMN);
-    testThrowsException(additionalContext, ERROR_MSG_MISSING_COLUMN, ROW_MISSING_COLUMN);
-  }
-
-  @Test
-  public void testMissingColumnThrowsExceptionDeprecated() throws Exception {
-    Context additionalContext = new Context();
-    additionalContext.put(PATTERN_PROP, TEST_REGEXP_MISSING_COLUMN);
-    additionalContext.put(SKIP_MISSING_COLUMN_PROP, String.valueOf(false));
-    testThrowsException(additionalContext, ERROR_MSG_MISSING_COLUMN, ROW_MISSING_COLUMN);
-  }
-
-  @Test
-  public void testMissingColumnThrowsException() throws Exception {
-    Context additionalContext = new Context();
-    additionalContext.put(PATTERN_PROP, TEST_REGEXP_MISSING_COLUMN);
-    additionalContext.put(MISSING_COLUMN_POLICY_PROP, POLICY_REJECT);
-    testThrowsException(additionalContext, ERROR_MSG_MISSING_COLUMN, ROW_MISSING_COLUMN);
-  }
-
-  @Test
-  public void testMissingColumnLogsWarningDeprecated() throws Exception {
-    Context additionalContext = new Context();
-    additionalContext.put(PATTERN_PROP, TEST_REGEXP_MISSING_COLUMN);
-    additionalContext.put(SKIP_MISSING_COLUMN_PROP, String.valueOf(true));
-    testLogging(additionalContext, ERROR_MSG_MISSING_COLUMN, ROW_MISSING_COLUMN);
-  }
-
-  @Test
-  public void testMissingColumnLogsWarning() throws Exception {
-    Context additionalContext = new Context();
-    additionalContext.put(PATTERN_PROP, TEST_REGEXP_MISSING_COLUMN);
-    additionalContext.put(MISSING_COLUMN_POLICY_PROP, POLICY_WARN);
-    testLogging(additionalContext, ERROR_MSG_MISSING_COLUMN, ROW_MISSING_COLUMN);
-  }
-
-
-  @Test
-  public void testMissingColumnIgnored() throws Exception {
-    Context additionalContext = new Context();
-    additionalContext.put(PATTERN_PROP, TEST_REGEXP_MISSING_COLUMN);
-    additionalContext.put(MISSING_COLUMN_POLICY_PROP, POLICY_IGNORE);
-    testIgnored(additionalContext, ERROR_MSG_MISSING_COLUMN, ROW_MISSING_COLUMN);
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testMissingColumnConfigValidation() throws Exception {
-    Context additionalContext = new Context();
-    additionalContext.put(SKIP_MISSING_COLUMN_PROP, String.valueOf(false));
-    additionalContext.put(MISSING_COLUMN_POLICY_PROP, POLICY_IGNORE);
-    getProducer(additionalContext);
-  }
-
-  @Test
-  public void testBadColumnValueThrowsExceptionDefaultConfig() throws Exception {
-    Context additionalContext = new Context();
-    testThrowsException(additionalContext, ERROR_MSG_BAD_COLUMN_VALUE, ROW_BAD_COLUMN_VALUE);
-  }
-
-  @Test
-  public void testBadColumnValueThrowsExceptionDeprecated() throws Exception {
-    Context additionalContext = new Context();
-    additionalContext.put(SKIP_BAD_COLUMN_VALUE_PROP, String.valueOf(false));
-    testThrowsException(additionalContext, ERROR_MSG_BAD_COLUMN_VALUE, ROW_BAD_COLUMN_VALUE);
-  }
-
-  @Test
-  public void testBadColumnValueThrowsException() throws Exception {
-    Context additionalContext = new Context();
-    additionalContext.put(BAD_COLUMN_VALUE_POLICY_PROP, POLICY_REJECT);
-    testThrowsException(additionalContext, ERROR_MSG_BAD_COLUMN_VALUE, ROW_BAD_COLUMN_VALUE);
-  }
-
-  @Test
-  public void testBadColumnValueLogsWarningDeprecated() throws Exception {
-    Context additionalContext = new Context();
-    additionalContext.put(SKIP_BAD_COLUMN_VALUE_PROP, String.valueOf(true));
-    testLogging(additionalContext, ERROR_MSG_BAD_COLUMN_VALUE, ROW_BAD_COLUMN_VALUE);
-  }
-
-  @Test
-  public void testBadColumnValueLogsWarning() throws Exception {
-    Context additionalContext = new Context();
-    additionalContext.put(BAD_COLUMN_VALUE_POLICY_PROP, POLICY_WARN);
-    testLogging(additionalContext, ERROR_MSG_BAD_COLUMN_VALUE, ROW_BAD_COLUMN_VALUE);
-  }
-
-  @Test
-  public void testBadColumnValueIgnored() throws Exception {
-    Context additionalContext = new Context();
-    additionalContext.put(BAD_COLUMN_VALUE_POLICY_PROP, POLICY_IGNORE);
-    testIgnored(additionalContext, ERROR_MSG_BAD_COLUMN_VALUE, ROW_BAD_COLUMN_VALUE);
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testBadColumnValueConfigValidation() throws Exception {
-    Context additionalContext = new Context();
-    additionalContext.put(SKIP_BAD_COLUMN_VALUE_PROP, String.valueOf(false));
-    additionalContext.put(BAD_COLUMN_VALUE_POLICY_PROP, POLICY_IGNORE);
-    getProducer(additionalContext);
-  }
-
-  @Test
-  public void testUnmatchedRowLogsWarningWithDefaultConfig() throws Exception {
-    Context additionalContext = new Context();
-    testLogging(additionalContext, ERROR_MSG_UNMATCHED_ROW, ROW_UNMATCHING);
-  }
-
-  @Test
-  public void testUnmatchedRowThrowsException() throws Exception {
-    Context additionalContext = new Context();
-    additionalContext.put(UNMATCHED_ROW_POLICY_PROP, POLICY_REJECT);
-    testThrowsException(additionalContext, ERROR_MSG_UNMATCHED_ROW, ROW_UNMATCHING);
-  }
-
-  @Test
-  public void testUnmatchedRowLogsWarningDeprecated() throws Exception {
-    Context additionalContext = new Context();
-    additionalContext.put(WARN_UNMATCHED_ROWS_PROP, String.valueOf(true));
-    testLogging(additionalContext, ERROR_MSG_UNMATCHED_ROW, ROW_UNMATCHING);
-  }
-
-  @Test
-  public void testUnmatchedRowLogsWarning() throws Exception {
-    Context additionalContext = new Context();
-    additionalContext.put(UNMATCHED_ROW_POLICY_PROP, POLICY_WARN);
-    testLogging(additionalContext, ERROR_MSG_UNMATCHED_ROW, ROW_UNMATCHING);
-  }
-
-  @Test
-  public void testUnmatchedRowIgnoredDeprecated() throws Exception {
-    Context additionalContext = new Context();
-    additionalContext.put(WARN_UNMATCHED_ROWS_PROP, String.valueOf(false));
-    testIgnored(additionalContext, ERROR_MSG_UNMATCHED_ROW, ROW_UNMATCHING);
-  }
-
-  @Test
-  public void testUnmatchedRowIgnored() throws Exception {
-    Context additionalContext = new Context();
-    additionalContext.put(UNMATCHED_ROW_POLICY_PROP, POLICY_IGNORE);
-    testIgnored(additionalContext, ERROR_MSG_UNMATCHED_ROW, ROW_UNMATCHING);
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testUnmatchedRowConfigValidation() throws Exception {
-    Context additionalContext = new Context();
-    additionalContext.put(WARN_UNMATCHED_ROWS_PROP, String.valueOf(false));
-    additionalContext.put(UNMATCHED_ROW_POLICY_PROP, POLICY_IGNORE);
-    getProducer(additionalContext);
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testUnKnownPolicyConfigValidation() throws Exception {
-    Context additionalContext = new Context();
-    additionalContext.put(UNMATCHED_ROW_POLICY_PROP, "FORCED");
-    getProducer(additionalContext);
-  }
-
-  private void testLogging(
-      Context additionalContext, String expectedError, String eventBody) throws Exception {
-    String appendedText = processEvent(additionalContext, eventBody);
-    assertTrue(appendedText.contains(expectedError));
-  }
-
-  private void testIgnored(
-      Context additionalContext, String expectedError, String eventBody) throws Exception {
-    String appendedText = processEvent(additionalContext, eventBody);
-    assertFalse(appendedText.contains(expectedError));
-  }
-
-  private void testThrowsException(
-      Context additionalContext, String expectedError, String eventBody) throws Exception {
-    thrown.expect(FlumeException.class);
-    thrown.expectMessage(expectedError);
-    processEvent(additionalContext, eventBody);
-  }
-
-  private String processEvent(Context additionalContext, String eventBody) throws Exception {
-    CapturingLogAppender appender = new CapturingLogAppender();
-    RegexpKuduOperationsProducer producer = getProducer(additionalContext);
-    try (Closeable c = appender.attach()) {
-      producer.getOperations(EventBuilder.withBody(eventBody.getBytes(Charset.forName("UTF-8"))));
-    }
-    return appender.getAppendedText();
-  }
-
-
-  private RegexpKuduOperationsProducer getProducer(Context additionalContext) throws Exception {
-    RegexpKuduOperationsProducer producer = new RegexpKuduOperationsProducer();
-    producer.initialize(createNewTable("test"));
-    Context context = new Context();
-    context.put(PATTERN_PROP, TEST_REGEXP);
-    context.put(OPERATION_PROP, TEST_OPERATION);
-    context.putAll(additionalContext.getParameters());
-    producer.configure(context);
-
-    return producer;
-  }
-
-  private KuduTable createNewTable(String tableName) throws Exception {
-    ArrayList<ColumnSchema> columns = new ArrayList<>(10);
-    columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build());
-    columns.add(new ColumnSchema.ColumnSchemaBuilder("byteFld", Type.INT8).build());
-    columns.add(new ColumnSchema.ColumnSchemaBuilder("stringFld", Type.STRING).build());
-    CreateTableOptions createOptions =
-        new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 3).setNumReplicas(1);
-    KuduTable table = harness.getClient().createTable(tableName, new Schema(columns), createOptions);
-    return table;
-  }
-
-
-}
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java
deleted file mode 100644
index f063349..0000000
--- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.kudu.flume.sink;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER_PREFIX;
-import static org.apache.kudu.flume.sink.RegexpKuduOperationsProducer.OPERATION_PROP;
-import static org.apache.kudu.flume.sink.RegexpKuduOperationsProducer.PATTERN_PROP;
-import static org.apache.kudu.test.ClientTestUtil.scanTableToStrings;
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.event.EventBuilder;
-import org.apache.kudu.test.KuduTestHarness;
-import org.junit.Rule;
-import org.junit.Test;
-
-import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
-import org.apache.kudu.client.CreateTableOptions;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.util.DecimalUtil;
-
-public class RegexpKuduOperationsProducerTest {
-  private static final String TEST_REGEXP =
-      "(?<key>\\d+),(?<byteFld>\\d+),(?<shortFld>\\d+),(?<intFld>\\d+)," +
-      "(?<longFld>\\d+),(?<binaryFld>\\w+),(?<stringFld>\\w+),(?<boolFld>\\w+)," +
-      "(?<floatFld>\\d+\\.\\d*),(?<doubleFld>\\d+.\\d*),(?<decimalFld>\\d+.\\d*)";
-
-  @Rule
-  public KuduTestHarness harness = new KuduTestHarness();
-
-  private KuduTable createNewTable(String tableName) throws Exception {
-    ArrayList<ColumnSchema> columns = new ArrayList<>(10);
-    columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build());
-    columns.add(new ColumnSchema.ColumnSchemaBuilder("byteFld", Type.INT8).build());
-    columns.add(new ColumnSchema.ColumnSchemaBuilder("shortFld", Type.INT16).build());
-    columns.add(new ColumnSchema.ColumnSchemaBuilder("intFld", Type.INT32).build());
-    columns.add(new ColumnSchema.ColumnSchemaBuilder("longFld", Type.INT64).build());
-    columns.add(new ColumnSchema.ColumnSchemaBuilder("binaryFld", Type.BINARY).build());
-    columns.add(new ColumnSchema.ColumnSchemaBuilder("stringFld", Type.STRING).build());
-    columns.add(new ColumnSchema.ColumnSchemaBuilder("boolFld", Type.BOOL).build());
-    columns.add(new ColumnSchema.ColumnSchemaBuilder("floatFld", Type.FLOAT).build());
-    columns.add(new ColumnSchema.ColumnSchemaBuilder("doubleFld", Type.DOUBLE).build());
-    columns.add(new ColumnSchema.ColumnSchemaBuilder("decimalFld", Type.DECIMAL)
-        .typeAttributes(DecimalUtil.typeAttributes(9, 1)).build());
-    CreateTableOptions createOptions =
-        new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 3).setNumReplicas(1);
-    return harness.getClient().createTable(tableName, new Schema(columns), createOptions);
-  }
-
-  @Test
-  public void testEmptyChannel() throws Exception {
-    testEvents(0, 1, "insert");
-  }
-
-  @Test
-  public void testOneEvent() throws Exception {
-    testEvents(1, 1, "insert");
-  }
-
-  @Test
-  public void testThreeEvents() throws Exception {
-    testEvents(3, 1, "insert");
-  }
-
-  @Test
-  public void testThreeEventsWithUpsert() throws Exception {
-    testEvents(3, 1, "upsert");
-  }
-
-  @Test
-  public void testOneEventTwoRowsEach() throws Exception {
-    testEvents(1, 2, "insert");
-  }
-
-  @Test
-  public void testTwoEventsTwoRowsEach() throws Exception {
-    testEvents(2, 2, "insert");
-  }
-
-  @Test
-  public void testTwoEventsTwoRowsEachWithUpsert() throws Exception {
-    testEvents(2, 2, "upsert");
-  }
-
-  private void testEvents(int eventCount, int perEventRowCount, String operation) throws Exception {
-    String tableName = String.format("test%sevents%srowseach%s",
-        eventCount, perEventRowCount, operation);
-    Context context = new Context();
-    context.put(PRODUCER_PREFIX + PATTERN_PROP, TEST_REGEXP);
-    context.put(PRODUCER_PREFIX + OPERATION_PROP, operation);
-    context.put(PRODUCER, RegexpKuduOperationsProducer.class.getName());
-    KuduTable table = createNewTable(tableName);
-
-    List<Event> events = generateEvents(eventCount, perEventRowCount, operation);
-
-    KuduSinkTestUtil.processEventsCreatingSink(harness.getClient(), context, tableName, events);
-
-    List<String> rows = scanTableToStrings(table);
-    assertEquals(eventCount * perEventRowCount + " row(s) expected",
-        eventCount * perEventRowCount,
-        rows.size());
-
-    ArrayList<String> rightAnswers = new ArrayList<>(eventCount * perEventRowCount);
-    for (int i = 0; i < eventCount; i++) {
-      for (int j = 0; j < perEventRowCount; j++) {
-        int value = operation.equals("upsert") && i == 0 ? 1 : i;
-        String baseAnswer = "INT32 key=1%2$d%3$d1, INT8 byteFld=%1$d, INT16 shortFld=%1$d, " +
-            "INT32 intFld=%1$d, INT64 longFld=%1$d, BINARY binaryFld=\"binary\", " +
-            "STRING stringFld=string, BOOL boolFld=false, FLOAT floatFld=%1$d.%1$d, " +
-            "DOUBLE doubleFld=%1$d.%1$d, DECIMAL decimalFld(9, 1)=%1$d.%1$d";
-        String rightAnswer = String.format(baseAnswer, value, i, j);
-        rightAnswers.add(rightAnswer);
-      }
-    }
-    Collections.sort(rightAnswers);
-
-    for (int k = 0; k < eventCount * perEventRowCount; k++) {
-      assertEquals("incorrect row", rightAnswers.get(k), rows.get(k));
-    }
-  }
-
-  private List<Event> generateEvents(int eventCount, int perEventRowCount, String operation) {
-    List<Event> events = new ArrayList<>();
-
-    for (int i = 0; i < eventCount; i++) {
-      StringBuilder payload = new StringBuilder();
-      for (int j = 0; j < perEventRowCount; j++) {
-        String baseRow = "|1%1$d%2$d1,%1$d,%1$d,%1$d,%1$d,binary," +
-            "string,false,%1$d.%1$d,%1$d.%1$d,%1$d.%1$d,%1$d|";
-        String row = String.format(baseRow, i, j);
-        payload.append(row);
-      }
-      Event e = EventBuilder.withBody(payload.toString().getBytes(UTF_8));
-      events.add(e);
-    }
-
-    if (eventCount > 0) {
-      // In the upsert case, add one upsert row per insert event (i.e. per i)
-      // All such rows go in one event.
-      if (operation.equals("upsert")) {
-        StringBuilder upserts = new StringBuilder();
-        for (int j = 0; j < perEventRowCount; j++) {
-          String row = String.format("|1%2$d%3$d1,%1$d,%1$d,%1$d,%1$d,binary," +
-              "string,false,%1$d.%1$d,%1$d.%1$d,%1$d.%1$d,%1$d|", 1, 0, j);
-          upserts.append(row);
-        }
-        Event e = EventBuilder.withBody(upserts.toString().getBytes(UTF_8));
-        events.add(e);
-      }
-
-      // Also check some bad/corner cases.
-      String mismatchInInt = "|1,2,taco,4,5,x,y,true,1.0.2.0,999|";
-      String emptyString = "";
-      String[] testCases = {mismatchInInt, emptyString};
-      for (String testCase : testCases) {
-        Event e = EventBuilder.withBody(testCase.getBytes(UTF_8));
-        events.add(e);
-      }
-    }
-    return events;
-  }
-}
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/SecureKuduSinkTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/SecureKuduSinkTest.java
deleted file mode 100644
index 11aa07b..0000000
--- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/SecureKuduSinkTest.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.kudu.flume.sink;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.kudu.test.ClientTestUtil.scanTableToStrings;
-import static org.apache.kudu.util.SecurityUtil.KUDU_TICKETCACHE_PROPERTY;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.time.Duration;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.event.EventBuilder;
-import org.apache.kudu.test.KuduTestHarness;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
-import org.apache.kudu.client.CreateTableOptions;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.test.cluster.MiniKuduCluster.MiniKuduClusterBuilder;
-
-public class SecureKuduSinkTest {
-  private static final Logger LOG = LoggerFactory.getLogger(SecureKuduSinkTest.class);
-  private static final int TICKET_LIFETIME_SECONDS = 20;
-  private static final int RENEWABLE_LIFETIME_SECONDS = 35;
-
-  private static final MiniKuduClusterBuilder clusterBuilder = KuduTestHarness.getBaseClusterBuilder()
-      .kdcTicketLifetime(TICKET_LIFETIME_SECONDS + "s")
-      .kdcRenewLifetime(RENEWABLE_LIFETIME_SECONDS + "s")
-      .enableKerberos();
-
-  @Rule
-  public KuduTestHarness harness = new KuduTestHarness(clusterBuilder);
-
-  @Before
-  public void clearTicketCacheProperty() {
-    // Let Flume authenticate.
-    System.clearProperty(KUDU_TICKETCACHE_PROPERTY);
-  }
-
-  @Test
-  public void testEventsWithShortTickets() throws Exception {
-    Instant start = Instant.now();
-    LOG.info("Creating new table...");
-    ArrayList<ColumnSchema> columns = new ArrayList<>(1);
-    columns.add(new ColumnSchema.ColumnSchemaBuilder("payload", Type.BINARY).key(true).build());
-    CreateTableOptions createOptions =
-        new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("payload"))
-        .setNumReplicas(1);
-    String tableName = "test_long_lived_events";
-    KuduTable table = harness.getClient().createTable(tableName, new Schema(columns), createOptions);
-    LOG.info("Created new table.");
-
-    KuduSink sink = KuduSinkTestUtil.createSecureSink(
-        tableName, harness.getMasterAddressesAsString(), harness.getClusterRoot());
-    sink.start();
-
-    LOG.info("Testing events at the beginning.");
-    int eventCount = 10;
-
-    processEvents(sink, 0, eventCount / 2);
-
-    LOG.info("Waiting for tickets to expire");
-    Duration elapsedSoFar = Duration.between(Instant.now(), start);
-    TimeUnit.MILLISECONDS.sleep(1000 * (RENEWABLE_LIFETIME_SECONDS + 1) - elapsedSoFar.toMillis());
-    // At this point, the ticket will have been outstanding for at least
-    // (RENEWABLE_LIFETIME_SECONDS + 1) seconds-- so the sink will need to reacquire a ticket.
-
-    LOG.info("Testing events after ticket renewal.");
-    processEvents(sink, eventCount / 2, eventCount);
-
-    List<String> rows = scanTableToStrings(table);
-    assertEquals(eventCount + " row(s) expected", eventCount, rows.size());
-
-    for (int i = 0; i < eventCount; i++) {
-      assertTrue("incorrect payload", rows.get(i).contains("payload body " + i));
-    }
-
-    LOG.info("Testing {} events finished successfully.", eventCount);
-  }
-
-  private void processEvents(KuduSink sink, int from, int to) throws EventDeliveryException {
-    List<Event> events = new ArrayList<>();
-    for (int i = from; i < to; i++) {
-      Event e = EventBuilder.withBody(String.format("payload body %s", i).getBytes(UTF_8));
-      events.add(e);
-    }
-
-    KuduSinkTestUtil.processEvents(sink, events);
-    LOG.info("Events flushed.");
-  }
-}
diff --git a/java/kudu-flume-sink/src/test/resources/log4j2.properties b/java/kudu-flume-sink/src/test/resources/log4j2.properties
deleted file mode 100644
index 22762a1..0000000
--- a/java/kudu-flume-sink/src/test/resources/log4j2.properties
+++ /dev/null
@@ -1,32 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-status = error
-name = PropertiesConfig
-appenders = console
-
-appender.console.type = Console
-appender.console.name = STDOUT
-appender.console.layout.type = PatternLayout
-appender.console.layout.pattern = %d{HH:mm:ss.SSS} [%p - %t] (%F:%L) %m%n
-
-rootLogger.level = info
-rootLogger.appenderRefs = stdout
-rootLogger.appenderRef.stdout.ref = STDOUT
-
-logger.kudu.name = org.apache.kudu
-logger.kudu.level = debug
diff --git a/java/kudu-flume-sink/src/test/resources/testAvroKuduOperationsProducer.avsc b/java/kudu-flume-sink/src/test/resources/testAvroKuduOperationsProducer.avsc
deleted file mode 100644
index 6bcf6d2..0000000
--- a/java/kudu-flume-sink/src/test/resources/testAvroKuduOperationsProducer.avsc
+++ /dev/null
@@ -1,14 +0,0 @@
-{"namespace": "org.apache.kudu.flume.sink",
-  "type": "record",
-  "name": "AvroKuduOperationsProducerTestRecord",
-  "fields": [
-    {"name": "key", "type": "int"},
-    {"name": "longField",  "type": "long"},
-    {"name": "doubleField",  "type": "double"},
-    {"name": "nullableField",  "type": ["string", "null"]},
-    {"name": "stringField", "type": "string"},
-    {"name": "decimalField", "type": {
-        "type": "bytes", "logicalType": "decimal", "precision": 9, "scale": 1}
-    }
-  ]
-}
\ No newline at end of file
diff --git a/java/settings.gradle b/java/settings.gradle
index 8a8bcfc..a89c703 100644
--- a/java/settings.gradle
+++ b/java/settings.gradle
@@ -24,7 +24,6 @@ include "kudu-backup-common"
 include "kudu-backup-tools"
 include "kudu-client"
 include "kudu-client-tools"
-include "kudu-flume-sink"
 include "kudu-hive"
 include "kudu-jepsen"
 include "kudu-mapreduce"