You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2016/03/30 03:54:31 UTC
incubator-kudu git commit: Add Kudu Flume sink.
Repository: incubator-kudu
Updated Branches:
refs/heads/master c8598ff04 -> fa70abe85
Add Kudu Flume sink.
Implement a Flume sink to write data to Kudu.
Change-Id: I53e02580908ba2468b216543719ebe5011a267c3
Reviewed-on: http://gerrit.cloudera.org:8080/2600
Reviewed-by: Mike Percy <mp...@apache.org>
Reviewed-by: Jean-Daniel Cryans
Tested-by: Kudu Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/fa70abe8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/fa70abe8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/fa70abe8
Branch: refs/heads/master
Commit: fa70abe8559b4d8ed9438aae1c3e1027c8662997
Parents: c8598ff
Author: arae <ar...@gmail.com>
Authored: Tue Mar 1 16:45:51 2016 -0800
Committer: Mike Percy <mp...@apache.org>
Committed: Wed Mar 30 01:53:59 2016 +0000
----------------------------------------------------------------------
java/kudu-flume-sink/pom.xml | 68 +++++
.../kududb/flume/sink/KuduEventProducer.java | 59 ++++
.../java/org/kududb/flume/sink/KuduSink.java | 267 +++++++++++++++++++
.../sink/KuduSinkConfigurationConstants.java | 67 +++++
.../flume/sink/SimpleKuduEventProducer.java | 81 ++++++
.../org/kududb/flume/sink/KuduSinkTest.java | 243 +++++++++++++++++
java/pom.xml | 1 +
7 files changed, 786 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/fa70abe8/java/kudu-flume-sink/pom.xml
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/pom.xml b/java/kudu-flume-sink/pom.xml
new file mode 100644
index 0000000..e7d796b
--- /dev/null
+++ b/java/kudu-flume-sink/pom.xml
@@ -0,0 +1,68 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>kudu-parent</artifactId>
+ <groupId>org.kududb</groupId>
+ <version>0.8.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>kudu-flume-sink</artifactId>
+ <name>Kudu Flume NG Sink</name>
+
+ <properties>
+ <flume.version>1.6.0</flume.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-sdk</artifactId>
+ <version>${flume.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-core</artifactId>
+ <version>${flume.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-configuration</artifactId>
+ <version>${flume.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.kududb</groupId>
+ <artifactId>kudu-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.kududb</groupId>
+ <artifactId>kudu-client</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/fa70abe8/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/KuduEventProducer.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/KuduEventProducer.java b/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/KuduEventProducer.java
new file mode 100644
index 0000000..7166300
--- /dev/null
+++ b/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/KuduEventProducer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kududb.flume.sink;
+
+import org.apache.flume.Event;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.ConfigurableComponent;
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.annotations.InterfaceStability;
+import org.kududb.client.KuduTable;
+import org.kududb.client.Operation;
+
+import java.util.List;
+
+/**
+ * Interface for an event producer which produces Kudu Operations to write
+ * the headers and body of an event in a Kudu table. This is configurable,
+ * so any config params required should be taken through this. The columns
+ * should exist in the table specified in the configuration for the KuduSink.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface KuduEventProducer extends Configurable, ConfigurableComponent {
+ /**
+ * Initialize the event producer.
+ * @param event to be written to Kudu
+ * @param table the KuduTable object used for creating Kudu Operation objects
+ */
+ void initialize(Event event, KuduTable table);
+
+ /**
+ * Get the operations that should be written out to Kudu as a result of this
+ * event. This list is written to Kudu using the Kudu client API.
+ * @return List of {@link org.kududb.client.Operation} which
+ * are written as such to Kudu
+ */
+ List<Operation> getOperations();
+
+ /*
+ * Clean up any state. This will be called when the sink is being stopped.
+ */
+ void close();
+}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/fa70abe8/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/KuduSink.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/KuduSink.java b/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/KuduSink.java
new file mode 100644
index 0000000..917c0b3
--- /dev/null
+++ b/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/KuduSink.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kududb.flume.sink;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.instrumentation.SinkCounter;
+import org.apache.flume.sink.AbstractSink;
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.annotations.InterfaceStability;
+import org.kududb.client.AsyncKuduClient;
+import org.kududb.client.KuduClient;
+import org.kududb.client.KuduSession;
+import org.kududb.client.KuduTable;
+import org.kududb.client.Operation;
+import org.kududb.client.OperationResponse;
+import org.kududb.client.SessionConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * A simple sink which reads events from a channel and writes them to Kudu.
+ * To use this sink, it has to be configured with certain mandatory parameters:<p>
+ * <tt>tableName: </tt> The name of the table in Kudu to write to. <p>
+ * <tt>masterAddresses: </tt> Comma-separated list of "host:port" pairs of the masters (port optional). <p>
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class KuduSink extends AbstractSink implements Configurable {
+ private static final Logger logger = LoggerFactory.getLogger(KuduSink.class);
+ private static final Long DEFAULT_BATCH_SIZE = 100L;
+ private static final Long DEFAULT_TIMEOUT_MILLIS =
+ AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS;
+ private static final String DEFAULT_KUDU_EVENT_PRODUCER =
+ "org.kududb.flume.sink.SimpleKuduEventProducer";
+ private static final boolean DEFAULT_IGNORE_DUPLICATE_ROWS = true;
+
+ private String masterAddresses;
+ private String tableName;
+ private long batchSize;
+ private long timeoutMillis;
+ private boolean ignoreDuplicateRows;
+ private KuduTable table;
+ private KuduSession session;
+ private KuduClient client;
+ private KuduEventProducer eventProducer;
+ private String eventProducerType;
+ private Context producerContext;
+ private SinkCounter sinkCounter;
+
+ public KuduSink() {
+ this(null);
+ }
+
+ @VisibleForTesting
+ @InterfaceAudience.Private
+ public KuduSink(KuduClient kuduClient) {
+ this.client = kuduClient;
+ }
+
+ @Override
+ public void start() {
+ Preconditions.checkState(table == null && session == null, "Please call stop " +
+ "before calling start on an old instance.");
+
+ // This is not null only inside tests
+ if (client == null) {
+ client = new KuduClient.KuduClientBuilder(masterAddresses).build();
+ }
+ session = client.newSession();
+ session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
+ session.setTimeoutMillis(timeoutMillis);
+ session.setIgnoreAllDuplicateRows(ignoreDuplicateRows);
+
+ try {
+ table = client.openTable(tableName);
+ } catch (Exception e) {
+ sinkCounter.incrementConnectionFailedCount();
+ String msg = String.format("Could not open table '%s' from Kudu", tableName);
+ logger.error(msg, e);
+ throw new FlumeException(msg, e);
+ }
+
+ super.start();
+ sinkCounter.incrementConnectionCreatedCount();
+ sinkCounter.start();
+ }
+
+ @Override
+ public void stop() {
+ try {
+ if (client != null) {
+ client.shutdown();
+ }
+ client = null;
+ table = null;
+ session = null;
+ } catch (Exception e) {
+ throw new FlumeException("Error closing client.", e);
+ }
+ sinkCounter.incrementConnectionClosedCount();
+ sinkCounter.stop();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void configure(Context context) {
+ masterAddresses = context.getString(KuduSinkConfigurationConstants.MASTER_ADDRESSES);
+ tableName = context.getString(KuduSinkConfigurationConstants.TABLE_NAME);
+
+ batchSize = context.getLong(
+ KuduSinkConfigurationConstants.BATCH_SIZE, DEFAULT_BATCH_SIZE);
+ timeoutMillis = context.getLong(
+ KuduSinkConfigurationConstants.TIMEOUT_MILLIS, DEFAULT_TIMEOUT_MILLIS);
+ ignoreDuplicateRows = context.getBoolean(
+ KuduSinkConfigurationConstants.IGNORE_DUPLICATE_ROWS, DEFAULT_IGNORE_DUPLICATE_ROWS);
+ eventProducerType = context.getString(KuduSinkConfigurationConstants.PRODUCER);
+
+ Preconditions.checkNotNull(masterAddresses,
+ "Master address cannot be empty, please specify '" +
+ KuduSinkConfigurationConstants.MASTER_ADDRESSES +
+ "' in configuration file");
+ Preconditions.checkNotNull(tableName,
+ "Table name cannot be empty, please specify '" +
+ KuduSinkConfigurationConstants.TABLE_NAME +
+ "' in configuration file");
+
+ // Check for event producer, if null set event producer type.
+ if (eventProducerType == null || eventProducerType.isEmpty()) {
+ eventProducerType = DEFAULT_KUDU_EVENT_PRODUCER;
+ logger.info("No Kudu event producer defined, will use default");
+ }
+
+ producerContext = new Context();
+ producerContext.putAll(context.getSubProperties(
+ KuduSinkConfigurationConstants.PRODUCER_PREFIX));
+
+ try {
+ Class<? extends KuduEventProducer> clazz =
+ (Class<? extends KuduEventProducer>)
+ Class.forName(eventProducerType);
+ eventProducer = clazz.newInstance();
+ eventProducer.configure(producerContext);
+ } catch (Exception e) {
+ logger.error("Could not instantiate Kudu event producer." , e);
+ Throwables.propagate(e);
+ }
+ sinkCounter = new SinkCounter(this.getName());
+ }
+
+ public KuduClient getClient() {
+ return client;
+ }
+
+ @Override
+ public Status process() throws EventDeliveryException {
+ if (session.hasPendingOperations()) {
+ // If for whatever reason we have pending operations then just refuse to process
+ // and tell caller to try again a bit later. We don't want to pile on the kudu
+ // session object.
+ return Status.BACKOFF;
+ }
+
+ Channel channel = getChannel();
+ Transaction txn = channel.getTransaction();
+
+ txn.begin();
+
+ try {
+ long txnEventCount = 0;
+ for (; txnEventCount < batchSize; txnEventCount++) {
+ Event event = channel.take();
+ if (event == null) {
+ break;
+ }
+
+ eventProducer.initialize(event, table);
+ List<Operation> operations = eventProducer.getOperations();
+ for (Operation o : operations) {
+ session.apply(o);
+ }
+ }
+
+ logger.debug("Flushing {} events", txnEventCount);
+ List<OperationResponse> responses = session.flush();
+ if (responses != null) {
+ for (OperationResponse response : responses) {
+ // Only throw an EventDeliveryException if at least one of the responses was
+ // not a row error. Row errors can occur for example when an event is inserted
+ // into Kudu successfully as part of a transaction but the transaction is
+ // rolled back and a subsequent replay of the Flume transaction leads to a
+ // row error due to the fact that the row already exists (Kudu doesn't support
+ // "insert or overwrite" semantics yet).
+ if (response.hasRowError()) {
+ throw new EventDeliveryException("Failed to flush one or more changes. " +
+ "Transaction rolled back: " + response.getRowError().toString());
+ }
+ }
+ }
+
+ if (txnEventCount == 0) {
+ sinkCounter.incrementBatchEmptyCount();
+ } else if (txnEventCount == batchSize) {
+ sinkCounter.incrementBatchCompleteCount();
+ } else {
+ sinkCounter.incrementBatchUnderflowCount();
+ }
+
+ txn.commit();
+
+ if (txnEventCount == 0) {
+ return Status.BACKOFF;
+ }
+
+ sinkCounter.addToEventDrainSuccessCount(txnEventCount);
+ return Status.READY;
+
+ } catch (Throwable e) {
+ txn.rollback();
+
+ String msg = "Failed to commit transaction. Transaction rolled back.";
+ logger.error(msg, e);
+ if (e instanceof Error || e instanceof RuntimeException) {
+ Throwables.propagate(e);
+ } else {
+ logger.error(msg, e);
+ throw new EventDeliveryException(msg, e);
+ }
+ } finally {
+ txn.close();
+ }
+
+ return Status.BACKOFF;
+ }
+
+ @VisibleForTesting
+ @InterfaceAudience.Private
+ KuduEventProducer getEventProducer() {
+ return eventProducer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/fa70abe8/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/KuduSinkConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/KuduSinkConfigurationConstants.java b/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/KuduSinkConfigurationConstants.java
new file mode 100644
index 0000000..6486137
--- /dev/null
+++ b/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/KuduSinkConfigurationConstants.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.kududb.flume.sink;
+
+import org.kududb.annotations.InterfaceAudience;
+import org.kududb.annotations.InterfaceStability;
+
+/**
+ * Constants used for configuration of KuduSink
+ */
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class KuduSinkConfigurationConstants {
+ /**
+ * Comma-separated list of "host:port" pairs of the masters (port optional).
+ */
+ public static final String MASTER_ADDRESSES = "masterAddresses";
+
+ /**
+ * The name of the table in Kudu to write to.
+ */
+ public static final String TABLE_NAME = "tableName";
+
+ /**
+ * The fully qualified class name of the Kudu event producer the sink should use.
+ */
+ public static final String PRODUCER = "producer";
+
+ /**
+ * Configuration to pass to the Kudu event producer.
+ */
+ public static final String PRODUCER_PREFIX = PRODUCER + ".";
+
+ /**
+ * Maximum number of events the sink should take from the channel per
+ * transaction, if available.
+ */
+ public static final String BATCH_SIZE = "batchSize";
+
+ /**
+ * Timeout period for Kudu operations, in milliseconds.
+ */
+ public static final String TIMEOUT_MILLIS = "timeoutMillis";
+
+ /**
+ * Whether to ignore errors indicating that we attempted to insert duplicate rows into Kudu.
+ */
+ public static final String IGNORE_DUPLICATE_ROWS = "ignoreDuplicateRows";
+}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/fa70abe8/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/SimpleKuduEventProducer.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/SimpleKuduEventProducer.java b/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/SimpleKuduEventProducer.java
new file mode 100644
index 0000000..69d5564
--- /dev/null
+++ b/java/kudu-flume-sink/src/main/java/org/kududb/flume/sink/SimpleKuduEventProducer.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.kududb.flume.sink;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.kududb.client.Insert;
+import org.kududb.client.KuduTable;
+import org.kududb.client.Operation;
+import org.kududb.client.PartialRow;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A simple serializer that returns puts from an event, by writing the event
+ * body into it. The headers are discarded.
+ *
+ * Optional parameters: <p>
+ * <tt>payloadColumn:</tt> Which column to put payload in. If it is null,
+ * payload will be assumed.<p>
+ */
+public class SimpleKuduEventProducer implements KuduEventProducer {
+ private byte[] payload;
+ private KuduTable table;
+ private String payloadColumn;
+
+ public SimpleKuduEventProducer(){
+ }
+
+ @Override
+ public void configure(Context context) {
+ payloadColumn = context.getString("payloadColumn","payload");
+ }
+
+ @Override
+ public void configure(ComponentConfiguration conf) {
+ }
+
+ @Override
+ public void initialize(Event event, KuduTable table) {
+ this.payload = event.getBody();
+ this.table = table;
+ }
+
+ @Override
+ public List<Operation> getOperations() throws FlumeException {
+ try {
+ Insert insert = table.newInsert();
+ PartialRow row = insert.getRow();
+ row.addBinary(payloadColumn, payload);
+
+ return Collections.singletonList((Operation) insert);
+ } catch (Exception e){
+ throw new FlumeException("Failed to create Kudu Insert object!", e);
+ }
+ }
+
+ @Override
+ public void close() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/fa70abe8/java/kudu-flume-sink/src/test/java/org/kududb/flume/sink/KuduSinkTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-flume-sink/src/test/java/org/kududb/flume/sink/KuduSinkTest.java b/java/kudu-flume-sink/src/test/java/org/kududb/flume/sink/KuduSinkTest.java
new file mode 100644
index 0000000..a0910dd
--- /dev/null
+++ b/java/kudu-flume-sink/src/test/java/org/kududb/flume/sink/KuduSinkTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kududb.flume.sink;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Throwables;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.Sink;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.junit.Assert;
+import org.junit.Test;
+import org.kududb.ColumnSchema;
+import org.kududb.Schema;
+import org.kududb.Type;
+import org.kududb.client.BaseKuduTest;
+import org.kududb.client.CreateTableOptions;
+import org.kududb.client.KuduTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class KuduSinkTest extends BaseKuduTest {
+ private static final Logger LOG = LoggerFactory.getLogger(KuduSinkTest.class);
+
+ private KuduTable createNewTable(String tableName) {
+ LOG.info("Creating new table...");
+
+ CreateTableOptions options = new CreateTableOptions().setNumReplicas(1);
+ ArrayList<ColumnSchema> columns = new ArrayList<>(1);
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("payload", Type.BINARY).key(true).build());
+ basicSchema = new Schema(columns);
+ KuduTable table = createTable(tableName, basicSchema, options);
+
+ LOG.info("Created new table.");
+
+ return table;
+ }
+
+ @Test
+ public void testMandatoryParameters() {
+ LOG.info("Testing mandatory parameters...");
+
+ KuduSink sink = new KuduSink(syncClient);
+
+ HashMap<String, String> parameters = new HashMap<>();
+ Context context = new Context(parameters);
+ try {
+ Configurables.configure(sink, context);
+ Assert.fail("Should have failed due to missing properties");
+ } catch (NullPointerException npe) {
+ //good
+ }
+
+ parameters.put(KuduSinkConfigurationConstants.TABLE_NAME, "tableName");
+ context = new Context(parameters);
+ try {
+ Configurables.configure(sink, context);
+ Assert.fail("Should have failed due to missing properties");
+ } catch (NullPointerException npe) {
+ //good
+ }
+
+ LOG.info("Testing mandatory parameters finished successfully.");
+ }
+
+ @Test(expected = FlumeException.class)
+ public void testMissingTable() throws Exception {
+ LOG.info("Testing missing table...");
+
+ KuduSink sink = createSink("missingTable");
+ Channel channel = new MemoryChannel();
+ Configurables.configure(channel, new Context());
+ sink.setChannel(channel);
+ sink.start();
+
+ LOG.info("Testing missing table finished successfully.");
+ }
+
+ @Test
+ public void testEmptyChannelWithDefaults() throws Exception {
+ testEventsWithDefaults(0);
+ }
+
+ @Test
+ public void testOneEventWithDefaults() throws Exception {
+ testEventsWithDefaults(1);
+ }
+
+ @Test
+ public void testThreeEventsWithDefaults() throws Exception {
+ testEventsWithDefaults(3);
+ }
+
+ @Test
+ public void testDuplicateRowsWithDuplicatesIgnored() throws Exception {
+ doTestDuplicateRows(true);
+ }
+
+ @Test
+ public void testDuplicateRowsWithDuplicatesNotIgnored() throws Exception {
+ doTestDuplicateRows(false);
+ }
+
+ private void doTestDuplicateRows(boolean ignoreDuplicateRows) {
+ KuduTable table = createNewTable("testDuplicateRows" + ignoreDuplicateRows);
+ String tableName = table.getName();
+ Context sinkContext = new Context();
+ sinkContext.put(KuduSinkConfigurationConstants.IGNORE_DUPLICATE_ROWS,
+ Boolean.toString(ignoreDuplicateRows));
+ KuduSink sink = createSink(tableName, sinkContext);
+
+ Channel channel = new MemoryChannel();
+ Configurables.configure(channel, new Context());
+ sink.setChannel(channel);
+ sink.start();
+
+ Transaction tx = channel.getTransaction();
+ tx.begin();
+
+ for (int i = 0; i < 2; i++) {
+ Event e = EventBuilder.withBody("key-0", Charsets.UTF_8); // Duplicate keys.
+ channel.put(e);
+ }
+
+ tx.commit();
+ tx.close();
+
+ try {
+ Sink.Status status = sink.process();
+ if (!ignoreDuplicateRows) {
+ fail("Incorrectly ignored duplicate rows!");
+ }
+ assertTrue("incorrect status for empty channel", status == Sink.Status.READY);
+ } catch (EventDeliveryException e) {
+ if (ignoreDuplicateRows) {
+ throw new AssertionError("Failed to ignore duplicate rows!", e);
+ } else {
+ LOG.info("Correctly did not ignore duplicate rows", e);
+ return;
+ }
+ }
+
+ // We only get here if the process() succeeded.
+ try {
+ List<String> rows = scanTableToStrings(table);
+ assertEquals("1 row expected", 1, rows.size());
+ } catch (Exception e) {
+ Throwables.propagate(e);
+ }
+
+ LOG.info("Testing duplicate events finished successfully.");
+ }
+
+ private void testEventsWithDefaults(int eventCount) throws Exception {
+ LOG.info("Testing {} events...", eventCount);
+
+ KuduTable table = createNewTable("test" + eventCount + "events");
+ String tableName = table.getName();
+ KuduSink sink = createSink(tableName);
+
+ Channel channel = new MemoryChannel();
+ Configurables.configure(channel, new Context());
+ sink.setChannel(channel);
+ sink.start();
+
+ Transaction tx = channel.getTransaction();
+ tx.begin();
+
+ for (int i = 0; i < eventCount; i++) {
+ Event e = EventBuilder.withBody(String.format("payload body %s", i).getBytes());
+ channel.put(e);
+ }
+
+ tx.commit();
+ tx.close();
+
+ Sink.Status status = sink.process();
+ if (eventCount == 0) {
+ assertTrue("incorrect status for empty channel", status == Sink.Status.BACKOFF);
+ } else {
+ assertTrue("incorrect status for non-empty channel", status != Sink.Status.BACKOFF);
+ }
+
+ List<String> rows = scanTableToStrings(table);
+ assertEquals(eventCount + " row(s) expected", eventCount, rows.size());
+
+ for (int i = 0; i < eventCount; i++) {
+ assertTrue("incorrect payload", rows.get(i).contains("payload body " + i));
+ }
+
+ LOG.info("Testing {} events finished successfully.", eventCount);
+ }
+
+ private KuduSink createSink(String tableName) {
+ return createSink(tableName, new Context());
+ }
+
+ private KuduSink createSink(String tableName, Context ctx) {
+ LOG.info("Creating Kudu sink for '{}' table...", tableName);
+
+ KuduSink sink = new KuduSink(syncClient);
+ HashMap<String, String> parameters = new HashMap<>();
+ parameters.put(KuduSinkConfigurationConstants.TABLE_NAME, tableName);
+ parameters.put(KuduSinkConfigurationConstants.MASTER_ADDRESSES, getMasterAddresses());
+ Context context = new Context(parameters);
+ context.putAll(ctx.getParameters());
+ Configurables.configure(sink, context);
+
+ LOG.info("Created Kudu sink for '{}' table.", tableName);
+
+ return sink;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/fa70abe8/java/pom.xml
----------------------------------------------------------------------
diff --git a/java/pom.xml b/java/pom.xml
index d31a027..987764c 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -74,6 +74,7 @@
<module>kudu-client-tools</module>
<module>kudu-mapreduce</module>
<module>kudu-spark</module>
+ <module>kudu-flume-sink</module>
</modules>
<build>