You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2016/03/30 03:54:31 UTC

incubator-kudu git commit: Add Kudu Flume sink.

Repository: incubator-kudu
Updated Branches:
  refs/heads/master c8598ff04 -> fa70abe85


Add Kudu Flume sink.

Implement a Flume sink to write data to Kudu.

Change-Id: I53e02580908ba2468b216543719ebe5011a267c3
Reviewed-on: http://gerrit.cloudera.org:8080/2600
Reviewed-by: Mike Percy <mp...@apache.org>
Reviewed-by: Jean-Daniel Cryans
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/fa70abe8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/fa70abe8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/fa70abe8

Branch: refs/heads/master
Commit: fa70abe8559b4d8ed9438aae1c3e1027c8662997
Parents: c8598ff
Author: arae <ar...@gmail.com>
Authored: Tue Mar 1 16:45:51 2016 -0800
Committer: Mike Percy <mp...@apache.org>
Committed: Wed Mar 30 01:53:59 2016 +0000

----------------------------------------------------------------------
 java/kudu-flume-sink/pom.xml                    |  68 +++++
 .../kududb/flume/sink/KuduEventProducer.java    |  59 ++++
 .../java/org/kududb/flume/sink/KuduSink.java    | 267 +++++++++++++++++++
 .../sink/KuduSinkConfigurationConstants.java    |  67 +++++
 .../flume/sink/SimpleKuduEventProducer.java     |  81 ++++++
 .../org/kududb/flume/sink/KuduSinkTest.java     | 243 +++++++++++++++++
 java/pom.xml                                    |   1 +
 7 files changed, 786 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/fa70abe8/java/kudu-flume-sink/pom.xml
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/pom.xml b/java/kudu-flume-sink/pom.xml
new file mode 100644
index 0000000..e7d796b
--- /dev/null
+++ b/java/kudu-flume-sink/pom.xml
@@ -0,0 +1,68 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+  license agreements. See the NOTICE file distributed with this work for additional
+  information regarding copyright ownership. The ASF licenses this file to
+  You under the Apache License, Version 2.0 (the "License"); you may not use
+  this file except in compliance with the License. You may obtain a copy of
+  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+  by applicable law or agreed to in writing, software distributed under the
+  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+  OF ANY KIND, either express or implied. See the License for the specific
+  language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>kudu-parent</artifactId>
+    <groupId>org.kududb</groupId>
+    <version>0.8.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>kudu-flume-sink</artifactId>
+  <name>Kudu Flume NG Sink</name>
+
+  <properties>
+    <flume.version>1.6.0</flume.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-sdk</artifactId>
+      <version>${flume.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-core</artifactId>
+      <version>${flume.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-configuration</artifactId>
+      <version>${flume.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.kududb</groupId>
+      <artifactId>kudu-client</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.kududb</groupId>
+      <artifactId>kudu-client</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>${junit.version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/fa70abe8/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/KuduEventProducer.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/KuduEventProducer.java b/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/KuduEventProducer.java
new file mode 100644
index 0000000..7166300
--- /dev/null
+++ b/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/KuduEventProducer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kududb.flume.sink;
+
+import org.apache.flume.Event;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.ConfigurableComponent;
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.annotations.InterfaceStability;
+import org.kududb.client.KuduTable;
+import org.kududb.client.Operation;
+
+import java.util.List;
+
+/**
+ * Interface for an event producer which produces Kudu Operations to write
+ * the headers and body of an event in a Kudu table. This is configurable,
+ * so any config params required should be taken through this. The columns
+ * should exist in the table specified in the configuration for the KuduSink.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface KuduEventProducer extends Configurable, ConfigurableComponent {
+  /**
+   * Initialize the event producer.
+   * @param event to be written to Kudu
+   * @param table the KuduTable object used for creating Kudu Operation objects
+   */
+  void initialize(Event event, KuduTable table);
+
+  /**
+   * Get the operations that should be written out to Kudu as a result of this
+   * event. This list is written to Kudu using the Kudu client API.
+   * @return List of {@link org.kududb.client.Operation} which
+   * are written as such to Kudu
+   */
+  List<Operation> getOperations();
+
+  /*
+   * Clean up any state. This will be called when the sink is being stopped.
+   */
+  void close();
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/fa70abe8/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/KuduSink.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/KuduSink.java b/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/KuduSink.java
new file mode 100644
index 0000000..917c0b3
--- /dev/null
+++ b/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/KuduSink.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kududb.flume.sink;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.instrumentation.SinkCounter;
+import org.apache.flume.sink.AbstractSink;
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.annotations.InterfaceStability;
+import org.kududb.client.AsyncKuduClient;
+import org.kududb.client.KuduClient;
+import org.kududb.client.KuduSession;
+import org.kududb.client.KuduTable;
+import org.kududb.client.Operation;
+import org.kududb.client.OperationResponse;
+import org.kududb.client.SessionConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * A simple sink which reads events from a channel and writes them to Kudu.
+ * To use this sink, it has to be configured with certain mandatory parameters:<p>
+ * <tt>tableName: </tt> The name of the table in Kudu to write to. <p>
+ * <tt>masterAddresses: </tt> Comma-separated list of "host:port" pairs of the masters (port optional). <p>
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class KuduSink extends AbstractSink implements Configurable {
+  private static final Logger logger = LoggerFactory.getLogger(KuduSink.class);
+  private static final Long DEFAULT_BATCH_SIZE = 100L;
+  private static final Long DEFAULT_TIMEOUT_MILLIS =
+          AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS;
+  private static final String DEFAULT_KUDU_EVENT_PRODUCER =
+          "org.kududb.flume.sink.SimpleKuduEventProducer";
+  private static final boolean DEFAULT_IGNORE_DUPLICATE_ROWS = true;
+
+  private String masterAddresses;
+  private String tableName;
+  private long batchSize;
+  private long timeoutMillis;
+  private boolean ignoreDuplicateRows;
+  private KuduTable table;
+  private KuduSession session;
+  private KuduClient client;
+  private KuduEventProducer eventProducer;
+  private String eventProducerType;
+  private Context producerContext;
+  private SinkCounter sinkCounter;
+
+  public KuduSink() {
+    this(null);
+  }
+
+  @VisibleForTesting
+  @InterfaceAudience.Private
+  public KuduSink(KuduClient kuduClient) {
+    this.client = kuduClient;
+  }
+
+  @Override
+  public void start() {
+    Preconditions.checkState(table == null && session == null, "Please call stop " +
+        "before calling start on an old instance.");
+
+    // This is not null only inside tests
+    if (client == null) {
+      client = new KuduClient.KuduClientBuilder(masterAddresses).build();
+    }
+    session = client.newSession();
+    session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
+    session.setTimeoutMillis(timeoutMillis);
+    session.setIgnoreAllDuplicateRows(ignoreDuplicateRows);
+
+    try {
+      table = client.openTable(tableName);
+    } catch (Exception e) {
+      sinkCounter.incrementConnectionFailedCount();
+      String msg = String.format("Could not open table '%s' from Kudu", tableName);
+      logger.error(msg, e);
+      throw new FlumeException(msg, e);
+    }
+
+    super.start();
+    sinkCounter.incrementConnectionCreatedCount();
+    sinkCounter.start();
+  }
+
+  @Override
+  public void stop() {
+    try {
+      if (client != null) {
+        client.shutdown();
+      }
+      client = null;
+      table = null;
+      session = null;
+    } catch (Exception e) {
+      throw new FlumeException("Error closing client.", e);
+    }
+    sinkCounter.incrementConnectionClosedCount();
+    sinkCounter.stop();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void configure(Context context) {
+    masterAddresses = context.getString(KuduSinkConfigurationConstants.MASTER_ADDRESSES);
+    tableName = context.getString(KuduSinkConfigurationConstants.TABLE_NAME);
+
+    batchSize = context.getLong(
+            KuduSinkConfigurationConstants.BATCH_SIZE, DEFAULT_BATCH_SIZE);
+    timeoutMillis = context.getLong(
+            KuduSinkConfigurationConstants.TIMEOUT_MILLIS, DEFAULT_TIMEOUT_MILLIS);
+    ignoreDuplicateRows = context.getBoolean(
+            KuduSinkConfigurationConstants.IGNORE_DUPLICATE_ROWS, DEFAULT_IGNORE_DUPLICATE_ROWS);
+    eventProducerType = context.getString(KuduSinkConfigurationConstants.PRODUCER);
+
+    Preconditions.checkNotNull(masterAddresses,
+        "Master address cannot be empty, please specify '" +
+                KuduSinkConfigurationConstants.MASTER_ADDRESSES +
+                "' in configuration file");
+    Preconditions.checkNotNull(tableName,
+        "Table name cannot be empty, please specify '" +
+                KuduSinkConfigurationConstants.TABLE_NAME +
+                "' in configuration file");
+
+    // Check for event producer, if null set event producer type.
+    if (eventProducerType == null || eventProducerType.isEmpty()) {
+      eventProducerType = DEFAULT_KUDU_EVENT_PRODUCER;
+      logger.info("No Kudu event producer defined, will use default");
+    }
+
+    producerContext = new Context();
+    producerContext.putAll(context.getSubProperties(
+            KuduSinkConfigurationConstants.PRODUCER_PREFIX));
+
+    try {
+      Class<? extends KuduEventProducer> clazz =
+          (Class<? extends KuduEventProducer>)
+          Class.forName(eventProducerType);
+      eventProducer = clazz.newInstance();
+      eventProducer.configure(producerContext);
+    } catch (Exception e) {
+      logger.error("Could not instantiate Kudu event producer." , e);
+      Throwables.propagate(e);
+    }
+    sinkCounter = new SinkCounter(this.getName());
+  }
+
+  public KuduClient getClient() {
+    return client;
+  }
+
+  @Override
+  public Status process() throws EventDeliveryException {
+    if (session.hasPendingOperations()) {
+      // If for whatever reason we have pending operations then just refuse to process
+      // and tell caller to try again a bit later. We don't want to pile on the kudu
+      // session object.
+      return Status.BACKOFF;
+    }
+
+    Channel channel = getChannel();
+    Transaction txn = channel.getTransaction();
+
+    txn.begin();
+
+    try {
+      long txnEventCount = 0;
+      for (; txnEventCount < batchSize; txnEventCount++) {
+        Event event = channel.take();
+        if (event == null) {
+          break;
+        }
+
+        eventProducer.initialize(event, table);
+        List<Operation> operations = eventProducer.getOperations();
+        for (Operation o : operations) {
+          session.apply(o);
+        }
+      }
+
+      logger.debug("Flushing {} events", txnEventCount);
+      List<OperationResponse> responses = session.flush();
+      if (responses != null) {
+        for (OperationResponse response : responses) {
+          // Only throw an EventDeliveryException if at least one of the responses was
+          // not a row error. Row errors can occur for example when an event is inserted
+          // into Kudu successfully as part of a transaction but the transaction is
+          // rolled back and a subsequent replay of the Flume transaction leads to a
+          // row error due to the fact that the row already exists (Kudu doesn't support
+          // "insert or overwrite" semantics yet).
+          if (response.hasRowError()) {
+            throw new EventDeliveryException("Failed to flush one or more changes. " +
+                "Transaction rolled back: " + response.getRowError().toString());
+          }
+        }
+      }
+
+      if (txnEventCount == 0) {
+        sinkCounter.incrementBatchEmptyCount();
+      } else if (txnEventCount == batchSize) {
+        sinkCounter.incrementBatchCompleteCount();
+      } else {
+        sinkCounter.incrementBatchUnderflowCount();
+      }
+
+      txn.commit();
+
+      if (txnEventCount == 0) {
+        return Status.BACKOFF;
+      }
+
+      sinkCounter.addToEventDrainSuccessCount(txnEventCount);
+      return Status.READY;
+
+    } catch (Throwable e) {
+      txn.rollback();
+
+      String msg = "Failed to commit transaction. Transaction rolled back.";
+      logger.error(msg, e);
+      if (e instanceof Error || e instanceof RuntimeException) {
+        Throwables.propagate(e);
+      } else {
+        logger.error(msg, e);
+        throw new EventDeliveryException(msg, e);
+      }
+    } finally {
+      txn.close();
+    }
+
+    return Status.BACKOFF;
+  }
+
+  @VisibleForTesting
+  @InterfaceAudience.Private
+  KuduEventProducer getEventProducer() {
+    return eventProducer;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/fa70abe8/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/KuduSinkConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/KuduSinkConfigurationConstants.java b/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/KuduSinkConfigurationConstants.java
new file mode 100644
index 0000000..6486137
--- /dev/null
+++ b/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/KuduSinkConfigurationConstants.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.kududb.flume.sink;
+
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.annotations.InterfaceStability;
+
+/**
+ * Constants used for configuration of KuduSink
+ */
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class KuduSinkConfigurationConstants {
+  /**
+   * Comma-separated list of "host:port" pairs of the masters (port optional).
+   */
+  public static final String MASTER_ADDRESSES = "masterAddresses";
+
+  /**
+   * The name of the table in Kudu to write to.
+   */
+  public static final String TABLE_NAME = "tableName";
+
+  /**
+   * The fully qualified class name of the Kudu event producer the sink should use.
+   */
+  public static final String PRODUCER = "producer";
+
+  /**
+   * Configuration to pass to the Kudu event producer.
+   */
+  public static final String PRODUCER_PREFIX = PRODUCER + ".";
+
+  /**
+   * Maximum number of events the sink should take from the channel per
+   * transaction, if available.
+   */
+  public static final String BATCH_SIZE = "batchSize";
+
+  /**
+   * Timeout period for Kudu operations, in milliseconds.
+   */
+  public static final String TIMEOUT_MILLIS = "timeoutMillis";
+
+  /**
+   * Whether to ignore errors indicating that we attempted to insert duplicate rows into Kudu.
+   */
+  public static final String IGNORE_DUPLICATE_ROWS = "ignoreDuplicateRows";
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/fa70abe8/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/SimpleKuduEventProducer.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/SimpleKuduEventProducer.java b/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/SimpleKuduEventProducer.java
new file mode 100644
index 0000000..69d5564
--- /dev/null
+++ b/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/SimpleKuduEventProducer.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.kududb.flume.sink;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.kududb.client.Insert;
+import org.kududb.client.KuduTable;
+import org.kududb.client.Operation;
+import org.kududb.client.PartialRow;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A simple serializer that returns puts from an event, by writing the event
+ * body into it. The headers are discarded.
+ *
+ * Optional parameters: <p>
+ * <tt>payloadColumn:</tt> Which column to put payload in. If it is null,
+ * payload will be assumed.<p>
+ */
+public class SimpleKuduEventProducer implements KuduEventProducer {
+  private byte[] payload;
+  private KuduTable table;
+  private String payloadColumn;
+
+  public SimpleKuduEventProducer(){
+  }
+
+  @Override
+  public void configure(Context context) {
+    payloadColumn = context.getString("payloadColumn","payload");
+  }
+
+  @Override
+  public void configure(ComponentConfiguration conf) {
+  }
+
+  @Override
+  public void initialize(Event event, KuduTable table) {
+    this.payload = event.getBody();
+    this.table = table;
+  }
+
+  @Override
+  public List<Operation> getOperations() throws FlumeException {
+    try {
+      Insert insert = table.newInsert();
+      PartialRow row = insert.getRow();
+      row.addBinary(payloadColumn, payload);
+
+      return Collections.singletonList((Operation) insert);
+    } catch (Exception e){
+      throw new FlumeException("Failed to create Kudu Insert object!", e);
+    }
+  }
+
+  @Override
+  public void close() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/fa70abe8/java/kudu-flume-sink/src/test/java/org/kududb/flume/sink/KuduSinkTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/test/java/org/kududb/flume/sink/KuduSinkTest.java b/java/kudu-flume-sink/src/test/java/org/kududb/flume/sink/KuduSinkTest.java
new file mode 100644
index 0000000..a0910dd
--- /dev/null
+++ b/java/kudu-flume-sink/src/test/java/org/kududb/flume/sink/KuduSinkTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kududb.flume.sink;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Throwables;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.Sink;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.junit.Assert;
+import org.junit.Test;
+import org.kududb.ColumnSchema;
+import org.kududb.Schema;
+import org.kududb.Type;
+import org.kududb.client.BaseKuduTest;
+import org.kududb.client.CreateTableOptions;
+import org.kududb.client.KuduTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class KuduSinkTest extends BaseKuduTest {
+  private static final Logger LOG = LoggerFactory.getLogger(KuduSinkTest.class);
+
+  private KuduTable createNewTable(String tableName) {
+    LOG.info("Creating new table...");
+
+    CreateTableOptions options = new CreateTableOptions().setNumReplicas(1);
+    ArrayList<ColumnSchema> columns = new ArrayList<>(1);
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("payload", Type.BINARY).key(true).build());
+    basicSchema = new Schema(columns);
+    KuduTable table = createTable(tableName, basicSchema, options);
+
+    LOG.info("Created new table.");
+
+    return table;
+  }
+
+  @Test
+  public void testMandatoryParameters() {
+    LOG.info("Testing mandatory parameters...");
+
+    KuduSink sink = new KuduSink(syncClient);
+
+    HashMap<String, String> parameters = new HashMap<>();
+    Context context = new Context(parameters);
+    try {
+      Configurables.configure(sink, context);
+      Assert.fail("Should have failed due to missing properties");
+    } catch (NullPointerException npe) {
+        //good
+    }
+
+    parameters.put(KuduSinkConfigurationConstants.TABLE_NAME, "tableName");
+    context = new Context(parameters);
+    try {
+      Configurables.configure(sink, context);
+      Assert.fail("Should have failed due to missing properties");
+    } catch (NullPointerException npe) {
+        //good
+    }
+
+    LOG.info("Testing mandatory parameters finished successfully.");
+  }
+
+  @Test(expected = FlumeException.class)
+  public void testMissingTable() throws Exception {
+    LOG.info("Testing missing table...");
+
+    KuduSink sink = createSink("missingTable");
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, new Context());
+    sink.setChannel(channel);
+    sink.start();
+
+    LOG.info("Testing missing table finished successfully.");
+  }
+
+  @Test
+  public void testEmptyChannelWithDefaults() throws Exception {
+    testEventsWithDefaults(0);
+  }
+
+  @Test
+  public void testOneEventWithDefaults() throws Exception {
+    testEventsWithDefaults(1);
+  }
+
+  @Test
+  public void testThreeEventsWithDefaults() throws Exception {
+    testEventsWithDefaults(3);
+  }
+
+  @Test
+  public void testDuplicateRowsWithDuplicatesIgnored() throws Exception {
+    doTestDuplicateRows(true);
+  }
+
+  @Test
+  public void testDuplicateRowsWithDuplicatesNotIgnored() throws Exception {
+    doTestDuplicateRows(false);
+  }
+
+  private void doTestDuplicateRows(boolean ignoreDuplicateRows) {
+    KuduTable table = createNewTable("testDuplicateRows" + ignoreDuplicateRows);
+    String tableName = table.getName();
+    Context sinkContext = new Context();
+    sinkContext.put(KuduSinkConfigurationConstants.IGNORE_DUPLICATE_ROWS,
+                    Boolean.toString(ignoreDuplicateRows));
+    KuduSink sink = createSink(tableName, sinkContext);
+
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, new Context());
+    sink.setChannel(channel);
+    sink.start();
+
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+
+    for (int i = 0; i < 2; i++) {
+      Event e = EventBuilder.withBody("key-0", Charsets.UTF_8); // Duplicate keys.
+      channel.put(e);
+    }
+
+    tx.commit();
+    tx.close();
+
+    try {
+      Sink.Status status = sink.process();
+      if (!ignoreDuplicateRows) {
+        fail("Incorrectly ignored duplicate rows!");
+      }
+      assertTrue("incorrect status for empty channel", status == Sink.Status.READY);
+    } catch (EventDeliveryException e) {
+      if (ignoreDuplicateRows) {
+        throw new AssertionError("Failed to ignore duplicate rows!", e);
+      } else {
+        LOG.info("Correctly did not ignore duplicate rows", e);
+        return;
+      }
+    }
+
+    // We only get here if the process() succeeded.
+    try {
+      List<String> rows = scanTableToStrings(table);
+      assertEquals("1 row expected", 1, rows.size());
+    } catch (Exception e) {
+      Throwables.propagate(e);
+    }
+
+    LOG.info("Testing duplicate events finished successfully.");
+  }
+
+  private void testEventsWithDefaults(int eventCount) throws Exception {
+    LOG.info("Testing {} events...", eventCount);
+
+    KuduTable table = createNewTable("test" + eventCount + "events");
+    String tableName = table.getName();
+    KuduSink sink = createSink(tableName);
+
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, new Context());
+    sink.setChannel(channel);
+    sink.start();
+
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+
+    for (int i = 0; i < eventCount; i++) {
+      Event e = EventBuilder.withBody(String.format("payload body %s", i).getBytes());
+      channel.put(e);
+    }
+
+    tx.commit();
+    tx.close();
+
+    Sink.Status status = sink.process();
+    if (eventCount == 0) {
+      assertTrue("incorrect status for empty channel", status == Sink.Status.BACKOFF);
+    } else {
+      assertTrue("incorrect status for non-empty channel", status != Sink.Status.BACKOFF);
+    }
+
+    List<String> rows = scanTableToStrings(table);
+    assertEquals(eventCount + " row(s) expected", eventCount, rows.size());
+
+    for (int i = 0; i < eventCount; i++) {
+      assertTrue("incorrect payload", rows.get(i).contains("payload body " + i));
+    }
+
+    LOG.info("Testing {} events finished successfully.", eventCount);
+  }
+
+  private KuduSink createSink(String tableName) {
+    return createSink(tableName, new Context());
+  }
+
+  private KuduSink createSink(String tableName, Context ctx) {
+    LOG.info("Creating Kudu sink for '{}' table...", tableName);
+
+    KuduSink sink = new KuduSink(syncClient);
+    HashMap<String, String> parameters = new HashMap<>();
+    parameters.put(KuduSinkConfigurationConstants.TABLE_NAME, tableName);
+    parameters.put(KuduSinkConfigurationConstants.MASTER_ADDRESSES, getMasterAddresses());
+    Context context = new Context(parameters);
+    context.putAll(ctx.getParameters());
+    Configurables.configure(sink, context);
+
+    LOG.info("Created Kudu sink for '{}' table.", tableName);
+
+    return sink;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/fa70abe8/java/pom.xml
----------------------------------------------------------------------
diff --git a/java/pom.xml b/java/pom.xml
index d31a027..987764c 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -74,6 +74,7 @@
         <module>kudu-client-tools</module>
         <module>kudu-mapreduce</module>
         <module>kudu-spark</module>
+        <module>kudu-flume-sink</module>
     </modules>
 
     <build>