You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2019/11/13 01:52:53 UTC
[flume] branch trunk updated: FLUME-3345 Add Kudu Flume Sinks
This is an automated email from the ASF dual-hosted git repository.
mpercy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/flume.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9dafe98 FLUME-3345 Add Kudu Flume Sinks
9dafe98 is described below
commit 9dafe98972a652aa1a202c6df9c08139d2bea592
Author: Grant Henke <gh...@cloudera.com>
AuthorDate: Wed Sep 18 16:43:57 2019 -0500
FLUME-3345 Add Kudu Flume Sinks
This patch adds the Flume sinks that were previously maintained
in the Apache Kudu project.
---
.../config/checkstyle/checkstyle-suppressions.xml | 2 +-
flume-ng-sinks/flume-ng-kudu-sink/pom.xml | 192 ++++++++++
.../sink/kudu/AvroKuduOperationsProducer.java | 282 +++++++++++++++
.../flume/sink/kudu/KuduOperationsProducer.java | 54 +++
.../java/org/apache/flume/sink/kudu/KuduSink.java | 315 ++++++++++++++++
.../sink/kudu/KuduSinkConfigurationConstants.java | 77 ++++
.../sink/kudu/RegexpKuduOperationsProducer.java | 398 +++++++++++++++++++++
.../kudu/SimpleKeyedKuduOperationsProducer.java | 136 +++++++
.../sink/kudu/SimpleKuduOperationsProducer.java | 92 +++++
.../apache/flume/sink/kudu/KuduSinkTestUtil.java | 99 +++++
.../sink/kudu/TestAvroKuduOperationsProducer.java | 183 ++++++++++
.../sink/kudu/TestKeyedKuduOperationsProducer.java | 189 ++++++++++
.../org/apache/flume/sink/kudu/TestKuduSink.java | 213 +++++++++++
.../kudu/TestRegexpKuduOperationsProducer.java | 184 ++++++++++
...TestRegexpKuduOperationsProducerParseError.java | 291 +++++++++++++++
.../apache/flume/sink/kudu/TestSecureKuduSink.java | 126 +++++++
.../src/test/resources/log4j2.xml | 32 ++
.../resources/testAvroKuduOperationsProducer.avsc | 11 +
flume-ng-sinks/pom.xml | 1 +
pom.xml | 5 +-
20 files changed, 2879 insertions(+), 3 deletions(-)
diff --git a/build-support/src/main/resources/config/checkstyle/checkstyle-suppressions.xml b/build-support/src/main/resources/config/checkstyle/checkstyle-suppressions.xml
index 5981468..0636775 100644
--- a/build-support/src/main/resources/config/checkstyle/checkstyle-suppressions.xml
+++ b/build-support/src/main/resources/config/checkstyle/checkstyle-suppressions.xml
@@ -29,7 +29,7 @@ under the License.
<!-- Suppress all style checks for generated code -->
<suppress checks=".*"
- files="generated-sources|com[/\\]cloudera[/\\]flume[/\\]handlers[/\\]thrift|org[/\\]apache[/\\]flume[/\\]thrift[/\\]|org[/\\]apache[/\\]flume[/\\]source[/\\]scribe|ProtosFactory.java"/>
+ files="generated-sources|com[/\\]cloudera[/\\]flume[/\\]handlers[/\\]thrift|org[/\\]apache[/\\]flume[/\\]thrift[/\\]|org[/\\]apache[/\\]flume[/\\]source[/\\]scribe|ProtosFactory.java|avro"/>
<!-- The "legacy" sources have a weird camelCaps package name -->
<suppress checks="PackageName"
diff --git a/flume-ng-sinks/flume-ng-kudu-sink/pom.xml b/flume-ng-sinks/flume-ng-kudu-sink/pom.xml
new file mode 100644
index 0000000..ee0665d
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kudu-sink/pom.xml
@@ -0,0 +1,192 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>flume-ng-sinks</artifactId>
+ <groupId>org.apache.flume</groupId>
+ <version>1.10.0-SNAPSHOT</version>
+ </parent>
+ <groupId>org.apache.flume.flume-ng-sinks</groupId>
+ <artifactId>flume-ng-kudu-sink</artifactId>
+ <name>Flume Kudu Sink</name>
+
+ <properties>
+ <exclude.tests>None</exclude.tests>
+ </properties>
+
+ <build>
+ <extensions>
+ <!-- Used to find the right kudu-binary artifact with the Maven
+ property ${os.detected.classifier} -->
+ <extension>
+ <groupId>kr.motd.maven</groupId>
+ <artifactId>os-maven-plugin</artifactId>
+ <version>1.6.2</version>
+ </extension>
+ </extensions>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>schema</goal>
+ </goals>
+ <configuration>
+ <testSourceDirectory>${basedir}/src/test/resources</testSourceDirectory>
+ <testIncludes>
+ <testInclude>**/*.avsc</testInclude>
+ </testIncludes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>${exclude.tests}</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-sdk</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-configuration</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-1.2-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kudu</groupId>
+ <artifactId>kudu-client</artifactId>
+ <version>${kudu.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kudu</groupId>
+ <artifactId>kudu-test-utils</artifactId>
+ <version>${kudu.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <profiles>
+ <profile>
+ <id>kudu-windows</id>
+ <activation>
+ <os>
+ <family>Windows</family>
+ </os>
+ </activation>
+ <properties>
+ <!-- Kudu tests do not support Windows. -->
+ <exclude.tests>**/*.java</exclude.tests>
+ </properties>
+ </profile>
+ <profile>
+ <id>kudu-linux</id>
+ <activation>
+ <os>
+ <family>Unix</family>
+ </os>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.kudu</groupId>
+ <artifactId>kudu-binary</artifactId>
+ <version>${kudu.version}</version>
+ <classifier>${os.detected.classifier}</classifier>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
+ <id>kudu-mac</id>
+ <activation>
+ <os>
+ <family>mac</family>
+ </os>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.kudu</groupId>
+ <artifactId>kudu-binary</artifactId>
+ <version>${kudu.version}</version>
+ <classifier>${os.detected.classifier}</classifier>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
+</project>
diff --git a/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/AvroKuduOperationsProducer.java b/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/AvroKuduOperationsProducer.java
new file mode 100644
index 0000000..bdc3b79
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/AvroKuduOperationsProducer.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.sink.kudu;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.PartialRow;
+
+/**
+ * An Avro serializer that generates one operation per event by deserializing the event
+ * body as an Avro record and mapping its fields to columns in a Kudu table.
+ *
+ * <p><strong>Avro Kudu Operations Producer configuration parameters</strong>
+ * <table cellpadding=3 cellspacing=0 border=1
+ * summary="Avro Kudu Operations Producer configuration parameters">
+ * <tr><th>Property Name</th>
+ * <th>Default</th>
+ * <th>Required?</th>
+ * <th>Description</th></tr>
+ * <tr>
+ * <td>producer.operation</td>
+ * <td>upsert</td>
+ * <td>No</td>
+ * <td>The operation used to write events to Kudu.
+ * Supported operations are 'insert' and 'upsert'</td>
+ * </tr>
+ * <tr>
+ * <td>producer.schemaPath</td>
+ * <td></td>
+ * <td>No</td>
+ * <td>The location of the Avro schema file used to deserialize the Avro-encoded event bodies.
+ * It's used whenever an event does not include its own schema. If not specified, the
+ * schema must be specified on a per-event basis, either by url or as a literal.
+ * Schemas must be record type.</td>
+ * </tr>
+ * </table>
+ */
+public class AvroKuduOperationsProducer implements KuduOperationsProducer {
+ public static final String OPERATION_PROP = "operation";
+ public static final String SCHEMA_PROP = "schemaPath";
+ public static final String DEFAULT_OPERATION = "upsert";
+ public static final String SCHEMA_URL_HEADER = "flume.avro.schema.url";
+ public static final String SCHEMA_LITERAL_HEADER = "flume.avro.schema.literal";
+
+ private String operation = "";
+ private GenericRecord reuse;
+ private KuduTable table;
+ private String defaultSchemaUrl;
+
+ /**
+ * The binary decoder to reuse for event parsing.
+ */
+ private BinaryDecoder decoder = null;
+
+ /**
+ * A cache of schemas retrieved by URL to avoid re-parsing the schema.
+ */
+ private static final LoadingCache<String, Schema> schemasFromURL =
+ CacheBuilder.newBuilder()
+ .build(new CacheLoader<String, Schema>() {
+ @Override
+ public Schema load(String url) throws IOException {
+ Schema.Parser parser = new Schema.Parser();
+ InputStream is = null;
+ try {
+ FileSystem fs = FileSystem.get(URI.create(url), conf);
+ if (url.toLowerCase(Locale.ENGLISH).startsWith("hdfs:/")) {
+ is = fs.open(new Path(url));
+ } else {
+ is = new URL(url).openStream();
+ }
+ return parser.parse(is);
+ } finally {
+ if (is != null) {
+ is.close();
+ }
+ }
+ }
+ });
+
+ /**
+ * A cache of literal schemas to avoid re-parsing the schema.
+ */
+ private static final LoadingCache<String, Schema> schemasFromLiteral =
+ CacheBuilder.newBuilder()
+ .build(new CacheLoader<String, Schema>() {
+ @Override
+ public Schema load(String literal) {
+ Preconditions.checkNotNull(literal,
+ "Schema literal cannot be null without a Schema URL");
+ return new Schema.Parser().parse(literal);
+ }
+ });
+
+ /**
+ * A cache of DatumReaders per schema.
+ */
+ private static final LoadingCache<Schema, DatumReader<GenericRecord>> readers =
+ CacheBuilder.newBuilder()
+ .build(new CacheLoader<Schema, DatumReader<GenericRecord>>() {
+ @Override
+ public DatumReader<GenericRecord> load(Schema schema) {
+ return new GenericDatumReader<>(schema);
+ }
+ });
+
+ private static final Configuration conf = new Configuration();
+
+ public AvroKuduOperationsProducer() {
+ }
+
+ @Override
+ public void configure(Context context) {
+ this.operation = context.getString(OPERATION_PROP, DEFAULT_OPERATION);
+
+ String schemaPath = context.getString(SCHEMA_PROP);
+ if (schemaPath != null) {
+ defaultSchemaUrl = schemaPath;
+ }
+ }
+
+ @Override
+ public void initialize(KuduTable table) {
+ this.table = table;
+ }
+
+ @Override
+ public List<Operation> getOperations(Event event) throws FlumeException {
+ Schema schema = getSchema(event);
+ DatumReader<GenericRecord> reader = readers.getUnchecked(schema);
+ decoder = DecoderFactory.get().binaryDecoder(event.getBody(), decoder);
+ try {
+ reuse = reader.read(reuse, decoder);
+ } catch (IOException e) {
+ throw new FlumeException("Cannot deserialize event", e);
+ }
+ Operation op;
+ switch (operation.toLowerCase(Locale.ENGLISH)) {
+ case "upsert":
+ op = table.newUpsert();
+ break;
+ case "insert":
+ op = table.newInsert();
+ break;
+ default:
+ throw new FlumeException(String.format("Unexpected operation %s", operation));
+ }
+ setupOp(op, reuse);
+ return Collections.singletonList(op);
+ }
+
+ private void setupOp(Operation op, GenericRecord record) {
+ PartialRow row = op.getRow();
+ for (ColumnSchema col : table.getSchema().getColumns()) {
+ String name = col.getName();
+ Object value = record.get(name);
+ if (value == null) {
+ // Set null if nullable, otherwise leave unset for possible Kudu default.
+ if (col.isNullable()) {
+ row.setNull(name);
+ }
+ } else {
+ // Avro doesn't support 8- or 16-bit integer types, but we'll allow them to be passed as
+ // a larger type.
+ try {
+ switch (col.getType()) {
+ case BOOL:
+ row.addBoolean(name, (boolean) value);
+ break;
+ case INT8:
+ row.addByte(name, (byte) value);
+ break;
+ case INT16:
+ row.addShort(name, (short) value);
+ break;
+ case INT32:
+ row.addInt(name, (int) value);
+ break;
+ case INT64: // Fall through
+ case UNIXTIME_MICROS:
+ row.addLong(name, (long) value);
+ break;
+ case FLOAT:
+ row.addFloat(name, (float) value);
+ break;
+ case DOUBLE:
+ row.addDouble(name, (double) value);
+ break;
+ case STRING:
+ row.addString(name, value.toString());
+ break;
+ case BINARY:
+ row.addBinary(name, (byte[]) value);
+ break;
+ default:
+ throw new FlumeException(String.format(
+ "Unrecognized type %s for column %s", col.getType().toString(), name));
+ }
+ } catch (ClassCastException e) {
+ throw new FlumeException(
+ String.format("Failed to coerce value for column '%s' to type %s",
+ col.getName(),
+ col.getType()), e);
+ }
+ }
+ }
+ }
+
+ private Schema getSchema(Event event) throws FlumeException {
+ Map<String, String> headers = event.getHeaders();
+ String schemaUrl = headers.get(SCHEMA_URL_HEADER);
+ String schemaLiteral = headers.get(SCHEMA_LITERAL_HEADER);
+ try {
+ if (schemaUrl != null) {
+ return schemasFromURL.get(schemaUrl);
+ } else if (schemaLiteral != null) {
+ return schemasFromLiteral.get(schemaLiteral);
+ } else if (defaultSchemaUrl != null) {
+ return schemasFromURL.get(defaultSchemaUrl);
+ } else {
+ throw new FlumeException(
+ String.format("No schema for event. " +
+ "Specify configuration property '%s' or event header '%s'",
+ SCHEMA_PROP,
+ SCHEMA_URL_HEADER));
+ }
+ } catch (ExecutionException e) {
+ throw new FlumeException("Cannot get schema", e);
+ } catch (RuntimeException e) {
+ throw new FlumeException("Cannot parse schema", e);
+ }
+ }
+
+ @Override
+ public void close() {
+ }
+}
diff --git a/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/KuduOperationsProducer.java b/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/KuduOperationsProducer.java
new file mode 100644
index 0000000..84ef34f
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/KuduOperationsProducer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.sink.kudu;
+
+import java.util.List;
+
+import org.apache.flume.Event;
+import org.apache.flume.conf.Configurable;
+
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+
+/**
+ * Interface for an operations producer that produces Kudu Operations from
+ * Flume events.
+ */
+public interface KuduOperationsProducer extends Configurable, AutoCloseable {
+ /**
+ * Initializes the operations producer. Called between configure and
+ * getOperations.
+ * @param table the KuduTable used to create Kudu Operation objects
+ */
+ void initialize(KuduTable table);
+
+ /**
+ * Returns the operations that should be written to Kudu as a result of this event.
+ * @param event Event to convert to one or more Operations
+ * @return List of Operations that should be written to Kudu
+ */
+ List<Operation> getOperations(Event event);
+
+ /**
+ * Cleans up any state. Called when the sink is stopped.
+ */
+ @Override
+ void close();
+}
diff --git a/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/KuduSink.java b/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/KuduSink.java
new file mode 100644
index 0000000..d5e15f8
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/KuduSink.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.sink.kudu;
+
+import java.lang.reflect.InvocationTargetException;
+import java.security.PrivilegedAction;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.Transaction;
+import org.apache.flume.auth.FlumeAuthenticationUtil;
+import org.apache.flume.auth.PrivilegedExecutor;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.instrumentation.SinkCounter;
+import org.apache.flume.sink.AbstractSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.client.AsyncKuduClient;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.OperationResponse;
+import org.apache.kudu.client.SessionConfiguration;
+
+/**
+ * A Flume sink that reads events from a channel and writes them to a Kudu table.
+ *
+ * <p><strong>Flume Kudu Sink configuration parameters</strong>
+ *
+ * <table cellpadding=3 cellspacing=0 border=1 summary="Flume Kudu Sink configuration parameters">
+ * <tr><th>Property Name</th><th>Default</th><th>Required?</th><th>Description</th></tr>
+ * <tr><td>channel</td><td></td><td>Yes</td><td>The name of the Flume channel to read.</td></tr>
+ * <tr><td>type</td><td></td><td>Yes</td>
+ * <td>Component name. Must be {@code org.apache.kudu.flume.sink.KuduSink}</td></tr>
+ * <tr><td>masterAddresses</td><td></td><td>Yes</td>
+ * <td>Comma-separated list of "host:port" Kudu master addresses.
+ * The port is optional.</td></tr>
+ * <tr><td>tableName</td><td></td><td>Yes</td>
+ * <td>The name of the Kudu table to write to.</td></tr>
+ * <tr><td>batchSize</td><td>1000</td><td>No</td>
+ * <td>The maximum number of events the sink takes from the channel per transaction.</td></tr>
+ * <tr><td>ignoreDuplicateRows</td><td>true</td>
+ * <td>No</td><td>Whether to ignore duplicate primary key errors caused by inserts.</td></tr>
+ * <tr><td>timeoutMillis</td><td>10000</td><td>No</td>
+ * <td>Timeout period for Kudu write operations, in milliseconds.</td></tr>
+ * <tr><td>producer</td><td>{@link SimpleKuduOperationsProducer}</td><td>No</td>
+ * <td>The fully-qualified class name of the {@link KuduOperationsProducer}
+ * the sink should use.</td></tr>
+ * <tr><td>producer.*</td><td></td><td>(Varies by operations producer)</td>
+ * <td>Configuration properties to pass to the operations producer implementation.</td></tr>
+ * </table>
+ *
+ * <p><strong>Installation</strong>
+ *
+ * <p>After building the sink, in order to use it with Flume, place the file named
+ * <tt>kudu-flume-sink-<em>VERSION</em>-jar-with-dependencies.jar</tt> in the
+ * Flume <tt>plugins.d</tt> directory under <tt>kudu-flume-sink/lib/</tt>.
+ *
+ * <p>For detailed instructions on using Flume's plugins.d mechanism, please see the plugins.d
+ * section of the <a href="https://flume.apache.org/FlumeUserGuide.html#the-plugins-d-directory">
+ * Flume User Guide</a>.
+ */
+public class KuduSink extends AbstractSink implements Configurable {
+ private static final Logger logger = LoggerFactory.getLogger(KuduSink.class);
+ private static final int DEFAULT_BATCH_SIZE = 1000;
+ private static final Long DEFAULT_TIMEOUT_MILLIS =
+ AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS;
+ private static final String DEFAULT_KUDU_OPERATION_PRODUCER =
+ "org.apache.flume.sink.kudu.SimpleKuduOperationsProducer";
+ private static final boolean DEFAULT_IGNORE_DUPLICATE_ROWS = true;
+
+ private String masterAddresses;
+ private String tableName;
+ private int batchSize;
+ private long timeoutMillis;
+ private boolean ignoreDuplicateRows;
+ private KuduTable table;
+ private KuduSession session;
+ private KuduClient client;
+ private KuduOperationsProducer operationsProducer;
+ private SinkCounter sinkCounter;
+ private PrivilegedExecutor privilegedExecutor;
+
+ public KuduSink() {
+ this(null);
+ }
+
+ public KuduSink(KuduClient kuduClient) {
+ this.client = kuduClient;
+ }
+
+ @Override
+ public synchronized void start() {
+ Preconditions.checkState(table == null && session == null,
+ "Please call stop before calling start on an old instance.");
+
+ // Client is not null only inside tests.
+ if (client == null) {
+ // Creating client with FlumeAuthenticator.
+ client = privilegedExecutor.execute(
+ new PrivilegedAction<KuduClient>() {
+ @Override
+ public KuduClient run() {
+ return new KuduClient.KuduClientBuilder(masterAddresses).build();
+ }
+ }
+ );
+ }
+ session = client.newSession();
+ session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
+ session.setTimeoutMillis(timeoutMillis);
+ session.setIgnoreAllDuplicateRows(ignoreDuplicateRows);
+ session.setMutationBufferSpace(batchSize);
+
+ try {
+ table = client.openTable(tableName);
+ } catch (Exception ex) {
+ sinkCounter.incrementConnectionFailedCount();
+ String msg = String.format("Could not open Kudu table '%s'", tableName);
+ logger.error(msg, ex);
+ throw new FlumeException(msg, ex);
+ }
+ operationsProducer.initialize(table);
+
+ super.start();
+ sinkCounter.incrementConnectionCreatedCount();
+ sinkCounter.start();
+ }
+
+ @Override
+ public synchronized void stop() {
+ Exception ex = null;
+ try {
+ operationsProducer.close();
+ } catch (Exception e) {
+ ex = e;
+ logger.error("Error closing operations producer", e);
+ }
+ try {
+ if (client != null) {
+ client.shutdown();
+ }
+ } catch (Exception e) {
+ ex = e;
+ logger.error("Error closing client", e);
+ }
+ client = null;
+ table = null;
+ session = null;
+
+ sinkCounter.incrementConnectionClosedCount();
+ sinkCounter.stop();
+ if (ex != null) {
+ throw new FlumeException("Error stopping sink", ex);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public synchronized void configure(Context context) {
+ masterAddresses = context.getString(KuduSinkConfigurationConstants.MASTER_ADDRESSES);
+ Preconditions.checkNotNull(masterAddresses,
+ "Missing master addresses. Please specify property '%s'.",
+ KuduSinkConfigurationConstants.MASTER_ADDRESSES);
+
+ tableName = context.getString(KuduSinkConfigurationConstants.TABLE_NAME);
+ Preconditions.checkNotNull(tableName,
+ "Missing table name. Please specify property '%s'",
+ KuduSinkConfigurationConstants.TABLE_NAME);
+
+ batchSize = context.getInteger(KuduSinkConfigurationConstants.BATCH_SIZE, DEFAULT_BATCH_SIZE);
+ timeoutMillis = context.getLong(KuduSinkConfigurationConstants.TIMEOUT_MILLIS,
+ DEFAULT_TIMEOUT_MILLIS);
+ ignoreDuplicateRows = context.getBoolean(KuduSinkConfigurationConstants.IGNORE_DUPLICATE_ROWS,
+ DEFAULT_IGNORE_DUPLICATE_ROWS);
+ String operationProducerType = context.getString(KuduSinkConfigurationConstants.PRODUCER);
+ String kerberosPrincipal =
+ context.getString(KuduSinkConfigurationConstants.KERBEROS_PRINCIPAL);
+ String kerberosKeytab = context.getString(KuduSinkConfigurationConstants.KERBEROS_KEYTAB);
+ String proxyUser = context.getString(KuduSinkConfigurationConstants.PROXY_USER);
+
+ privilegedExecutor = FlumeAuthenticationUtil.getAuthenticator(
+ kerberosPrincipal, kerberosKeytab).proxyAs(proxyUser);
+
+ // Check for operations producer, if null set default operations producer type.
+ if (operationProducerType == null || operationProducerType.isEmpty()) {
+ operationProducerType = DEFAULT_KUDU_OPERATION_PRODUCER;
+ logger.warn("No Kudu operations producer provided, using default");
+ }
+
+ Context producerContext = new Context();
+ producerContext.putAll(context.getSubProperties(
+ KuduSinkConfigurationConstants.PRODUCER_PREFIX));
+
+ try {
+ Class<? extends KuduOperationsProducer> clazz =
+ (Class<? extends KuduOperationsProducer>)
+ Class.forName(operationProducerType);
+ operationsProducer = clazz.getDeclaredConstructor().newInstance();
+ operationsProducer.configure(producerContext);
+ } catch (ClassNotFoundException | NoSuchMethodException |
+ InstantiationException | IllegalAccessException | InvocationTargetException e) {
+ logger.error("Could not instantiate Kudu operations producer" , e);
+ throw new RuntimeException(e);
+ }
+ sinkCounter = new SinkCounter(this.getName());
+ }
+
+ public synchronized KuduClient getClient() {
+ return client;
+ }
+
+ @Override
+ public synchronized Status process() throws EventDeliveryException {
+ if (session.hasPendingOperations()) {
+ // If for whatever reason we have pending operations, refuse to process
+ // more and tell the caller to try again a bit later. We don't want to
+ // pile on the KuduSession.
+ return Status.BACKOFF;
+ }
+
+ Channel channel = getChannel();
+ Transaction txn = channel.getTransaction();
+
+ txn.begin();
+
+ try {
+ long txnEventCount = 0;
+ for (; txnEventCount < batchSize; txnEventCount++) {
+ Event event = channel.take();
+ if (event == null) {
+ break;
+ }
+
+ List<Operation> operations = operationsProducer.getOperations(event);
+ for (Operation o : operations) {
+ session.apply(o);
+ }
+ }
+
+ logger.debug("Flushing {} events", txnEventCount);
+ List<OperationResponse> responses = session.flush();
+ if (responses != null) {
+ for (OperationResponse response : responses) {
+ // Throw an EventDeliveryException if at least one of the responses was
+ // a row error. Row errors can occur for example when an event is inserted
+ // into Kudu successfully but the Flume transaction is rolled back for some reason,
+ // and a subsequent replay of the same Flume transaction leads to a
+ // duplicate key error since the row already exists in Kudu.
+ // Note: Duplicate keys will not be reported as errors if ignoreDuplicateRows
+ // is enabled in the config.
+ if (response.hasRowError()) {
+ throw new EventDeliveryException("Failed to flush one or more changes. " +
+ "Transaction rolled back: " + response.getRowError().toString());
+ }
+ }
+ }
+
+ if (txnEventCount == 0) {
+ sinkCounter.incrementBatchEmptyCount();
+ } else if (txnEventCount == batchSize) {
+ sinkCounter.incrementBatchCompleteCount();
+ } else {
+ sinkCounter.incrementBatchUnderflowCount();
+ }
+
+ txn.commit();
+
+ if (txnEventCount == 0) {
+ return Status.BACKOFF;
+ }
+
+ sinkCounter.addToEventDrainSuccessCount(txnEventCount);
+ return Status.READY;
+
+ } catch (Throwable e) {
+ txn.rollback();
+
+ String msg = "Failed to commit transaction. Transaction rolled back.";
+ logger.error(msg, e);
+ if (e instanceof Error || e instanceof RuntimeException) {
+ throw new RuntimeException(e);
+ } else {
+ logger.error(msg, e);
+ throw new EventDeliveryException(msg, e);
+ }
+ } finally {
+ txn.close();
+ }
+ }
+}
diff --git a/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/KuduSinkConfigurationConstants.java b/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/KuduSinkConfigurationConstants.java
new file mode 100644
index 0000000..a39f82c
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/KuduSinkConfigurationConstants.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.sink.kudu;
+
+public class KuduSinkConfigurationConstants {
+ /**
+ * Comma-separated list of "host:port" Kudu master addresses.
+ * The port is optional and defaults to the Kudu Java client's default master
+ * port.
+ */
+ public static final String MASTER_ADDRESSES = "masterAddresses";
+
+ /**
+ * The name of the table in Kudu to write to.
+ */
+ public static final String TABLE_NAME = "tableName";
+
+ /**
+ * The fully qualified class name of the KuduOperationsProducer class that the
+ * sink should use.
+ */
+ public static final String PRODUCER = "producer";
+
+ /**
+ * Prefix for configuration parameters that are passed to the
+ * KuduOperationsProducer.
+ */
+ public static final String PRODUCER_PREFIX = PRODUCER + ".";
+
+ /**
+ * Maximum number of events that the sink should take from the channel per
+ * transaction.
+ */
+ public static final String BATCH_SIZE = "batchSize";
+
+ /**
+ * Timeout period for Kudu operations, in milliseconds.
+ */
+ public static final String TIMEOUT_MILLIS = "timeoutMillis";
+
+ /**
+ * Whether to ignore duplicate primary key errors caused by inserts.
+ */
+ public static final String IGNORE_DUPLICATE_ROWS = "ignoreDuplicateRows";
+
+ /**
+ * Path to the keytab file used for authentication
+ */
+ public static final String KERBEROS_KEYTAB = "kerberosKeytab";
+
+ /**
+ * Kerberos principal used for authentication
+ */
+ public static final String KERBEROS_PRINCIPAL = "kerberosPrincipal";
+
+ /**
+ * The effective user if different from the kerberos principal
+ */
+ public static final String PROXY_USER = "proxyUser";
+}
diff --git a/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/RegexpKuduOperationsProducer.java b/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/RegexpKuduOperationsProducer.java
new file mode 100644
index 0000000..a380f97
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/RegexpKuduOperationsProducer.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.sink.kudu;
+
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.Upsert;
+
+/**
+ * A regular expression operations producer that generates one or more Kudu
+ * {@link Insert} or {@link Upsert} operations per Flume {@link Event} by
+ * parsing the event {@code body} using a regular expression. Values are
+ * coerced to the types of the named columns in the Kudu table.
+ *
+ * <p>Example: If the Kudu table has the schema:
+ *
+ * <pre>
+ * key INT32
+ * name STRING</pre>
+ *
+ * <p>and {@code producer.pattern = (?<key>\\d+),(?<name>\\w+)} then
+ * {@code RegexpKuduOperationsProducer} will parse the string:
+ *
+ * <pre>|12345,Mike||54321,Todd|</pre>
+ *
+ * into the rows: {@code (key=12345, name=Mike)} and {@code (key=54321, name=Todd)}.
+ *
+ * <p>Note: This class relies on JDK7 named capturing groups, which are
+ * documented in {@link Pattern}. The name of each capturing group must
+ * correspond to a column name in the destination Kudu table.
+ *
+ * <p><strong><code>RegexpKuduOperationsProducer</code> Flume Configuration Parameters</strong></p>
+ *
+ * <table cellpadding=3 cellspacing=0 border=1 summary="Flume Configuration Parameters">
+ * <tr>
+ * <th>Property Name</th>
+ * <th>Default</th>
+ * <th>Required?</th>
+ * <th>Description</th>
+ * </tr>
+ * <tr>
+ * <td>producer.pattern</td>
+ * <td></td>
+ * <td>Yes</td>
+ * <td>The regular expression used to parse the event body.</td>
+ * </tr>
+ * <tr>
+ * <td>producer.charset</td>
+ * <td>utf-8</td>
+ * <td>No</td>
+ * <td>The character set of the event body.</td>
+ * </tr>
+ * <tr>
+ * <td>producer.operation</td>
+ * <td>upsert</td>
+ * <td>No</td>
+ * <td>Operation type used to write the event to Kudu. Must be either
+ * {@code insert} or {@code upsert}.</td>
+ * </tr>
+ * <tr>
+ * <td>producer.skipMissingColumn</td>
+ * <td>false</td>
+ * <td>No</td>
+ * <td>
+ * <b>@deprecated</b><br/> use {@code producer.missingColumnPolicy}
+ * What to do if a column in the Kudu table has no corresponding capture group.
+ * If set to {@code true}, a warning message is logged and the operation is still attempted.
+ * If set to {@code false}, an exception is thrown and the sink will not process the
+ * {@code Event}, causing a Flume {@code Channel} rollback.
+ * </tr>
+ * <tr>
+ * <td>producer.skipBadColumnValue</td>
+ * <td>false</td>
+ * <td>No</td>
+ * <td>
+ * <b>@deprecated</b><br/> use {@code producer.badColumnValuePolicy}
+ * What to do if a value in the pattern match cannot be coerced to the required type.
+ * If set to {@code true}, a warning message is logged and the operation is still attempted.
+ * If set to {@code false}, an exception is thrown and the sink will not process the
+ * {@code Event}, causing a Flume {@code Channel} rollback.
+ * </tr>
+ * <tr>
+ * <td>producer.warnUnmatchedRows</td>
+ * <td>true</td>
+ * <td>No</td>
+ * <td>
+ * <b>@deprecated</b><br/> use {@code producer.unmatchedRowPolicy}
+ * Whether to log a warning about payloads that do not match the pattern. If set to
+ * {@code false}, event bodies with no matches will be silently dropped.</td>
+ * </tr>
+ * <tr>
+ * <td>producer.missingColumnPolicy</td>
+ * <td>REJECT</td>
+ * <td>No</td>
+ * <td>What to do if a column in the Kudu table has no corresponding capture group.<br/>
+ * If set to {@code REJECT}, an exception is thrown and the sink will not process the
+ * {@code Event}, causing a Flume {@code Channel} rollback.<br/>
+ * If set to {@code WARN}, a warning message is logged and the operation is still produced.<br/>
+ * If set to {@code IGNORE}, the operation is still produced without any log message.
+ * </tr>
+ * <tr>
+ * <td>producer.badColumnValuePolicy</td>
+ * <td>REJECT</td>
+ * <td>No</td>
+ * <td>What to do if a value in the pattern match cannot be coerced to the required type.<br/>
+ * If set to {@code REJECT}, an exception is thrown and the sink will not process the
+ * {@code Event}, causing a Flume {@code Channel} rollback.<br/>
+ * If set to {@code WARN}, a warning message is logged and the operation is still produced,
+ * but does not include the given column.<br/>
+ * If set to {@code IGNORE}, the operation is still produced, but does not include the given
+ * column and does not log any message.
+ * </tr>
+ * <tr>
+ * <td>producer.unmatchedRowPolicy</td>
+ * <td>WARN</td>
+ * <td>No</td>
+ * <td>What to do if a payload does not match the pattern.<br/>
+ * If set to {@code REJECT}, an exception is thrown and the sink will not process the
+ * {@code Event}, causing a Flume {@code Channel} rollback.<br/>
+ * If set to {@code WARN}, a warning message is logged and the row is skipped,
+ * not producing an operation.<br/>
+ * If set to {@code IGNORE}, the row is skipped without any log message.
+ * </tr>
+ * </table>
+ *
+ * @see Pattern
+ */
+public class RegexpKuduOperationsProducer implements KuduOperationsProducer {
+ private static final Logger logger = LoggerFactory.getLogger(RegexpKuduOperationsProducer.class);
+ private static final String INSERT = "insert";
+ private static final String UPSERT = "upsert";
+ private static final List<String> validOperations = Lists.newArrayList(UPSERT, INSERT);
+
+ public static final String PATTERN_PROP = "pattern";
+ public static final String ENCODING_PROP = "encoding";
+ public static final String DEFAULT_ENCODING = "utf-8";
+ public static final String OPERATION_PROP = "operation";
+ public static final String DEFAULT_OPERATION = UPSERT;
+ @Deprecated
+ public static final String SKIP_MISSING_COLUMN_PROP = "skipMissingColumn";
+ @Deprecated
+ public static final boolean DEFAULT_SKIP_MISSING_COLUMN = false;
+ @Deprecated
+ public static final String SKIP_BAD_COLUMN_VALUE_PROP = "skipBadColumnValue";
+ @Deprecated
+ public static final boolean DEFAULT_SKIP_BAD_COLUMN_VALUE = false;
+ @Deprecated
+ public static final String WARN_UNMATCHED_ROWS_PROP = "skipUnmatchedRows";
+ @Deprecated
+ public static final boolean DEFAULT_WARN_UNMATCHED_ROWS = true;
+ public static final String MISSING_COLUMN_POLICY_PROP = "missingColumnPolicy";
+ public static final ParseErrorPolicy DEFAULT_MISSING_COLUMN_POLICY = ParseErrorPolicy.REJECT;
+ public static final String BAD_COLUMN_VALUE_POLICY_PROP = "badColumnValuePolicy";
+ public static final ParseErrorPolicy DEFAULT_BAD_COLUMN_VALUE_POLICY = ParseErrorPolicy.REJECT;
+ public static final String UNMATCHED_ROW_POLICY_PROP = "unmatchedRowPolicy";
+ public static final ParseErrorPolicy DEFAULT_UNMATCHED_ROW_POLICY = ParseErrorPolicy.WARN;
+
+ private KuduTable table;
+ private Pattern pattern;
+ private Charset charset;
+ private String operation;
+ private ParseErrorPolicy missingColumnPolicy;
+ private ParseErrorPolicy badColumnValuePolicy;
+ private ParseErrorPolicy unmatchedRowPolicy;
+
+ public RegexpKuduOperationsProducer() {
+ }
+
+ @Override
+ public void configure(Context context) {
+ String regexp = context.getString(PATTERN_PROP);
+ Preconditions.checkArgument(regexp != null,
+ "Required parameter %s is not specified",
+ PATTERN_PROP);
+ try {
+ pattern = Pattern.compile(regexp);
+ } catch (PatternSyntaxException e) {
+ throw new IllegalArgumentException(
+ String.format("The pattern '%s' is invalid", regexp), e);
+ }
+ String charsetName = context.getString(ENCODING_PROP, DEFAULT_ENCODING);
+ try {
+ charset = Charset.forName(charsetName);
+ } catch (IllegalArgumentException e) {
+ throw new FlumeException(
+ String.format("Invalid or unsupported charset %s", charsetName), e);
+ }
+ operation = context.getString(OPERATION_PROP, DEFAULT_OPERATION).toLowerCase(Locale.ENGLISH);
+ Preconditions.checkArgument(
+ validOperations.contains(operation),
+ "Unrecognized operation '%s'",
+ operation);
+
+
+ missingColumnPolicy = getParseErrorPolicyCheckingDeprecatedProperty(
+ context, SKIP_MISSING_COLUMN_PROP, MISSING_COLUMN_POLICY_PROP,
+ ParseErrorPolicy.WARN, ParseErrorPolicy.REJECT, DEFAULT_MISSING_COLUMN_POLICY
+ );
+
+ badColumnValuePolicy = getParseErrorPolicyCheckingDeprecatedProperty(
+ context, SKIP_BAD_COLUMN_VALUE_PROP, BAD_COLUMN_VALUE_POLICY_PROP,
+ ParseErrorPolicy.WARN, ParseErrorPolicy.REJECT, DEFAULT_BAD_COLUMN_VALUE_POLICY
+ );
+
+ unmatchedRowPolicy = getParseErrorPolicyCheckingDeprecatedProperty(
+ context, WARN_UNMATCHED_ROWS_PROP, UNMATCHED_ROW_POLICY_PROP,
+ ParseErrorPolicy.WARN, ParseErrorPolicy.IGNORE, DEFAULT_UNMATCHED_ROW_POLICY
+ );
+ }
+
+ @Override
+ public void initialize(KuduTable table) {
+ this.table = table;
+ }
+
+ @Override
+ public List<Operation> getOperations(Event event) throws FlumeException {
+ String raw = new String(event.getBody(), charset);
+ Matcher m = pattern.matcher(raw);
+ boolean match = false;
+ Schema schema = table.getSchema();
+ List<Operation> ops = Lists.newArrayList();
+ while (m.find()) {
+ match = true;
+ Operation op;
+ switch (operation) {
+ case UPSERT:
+ op = table.newUpsert();
+ break;
+ case INSERT:
+ op = table.newInsert();
+ break;
+ default:
+ throw new FlumeException(
+ String.format("Unrecognized operation type '%s' in getOperations(): " +
+ "this should never happen!", operation));
+ }
+ PartialRow row = op.getRow();
+ for (ColumnSchema col : schema.getColumns()) {
+ try {
+ coerceAndSet(m.group(col.getName()), col.getName(), col.getType(), row);
+ } catch (NumberFormatException e) {
+ String msg = String.format(
+ "Raw value '%s' couldn't be parsed to type %s for column '%s'",
+ raw, col.getType(), col.getName());
+ logOrThrow(badColumnValuePolicy, msg, e);
+ } catch (IllegalArgumentException e) {
+ String msg = String.format(
+ "Column '%s' has no matching group in '%s'",
+ col.getName(), raw);
+ logOrThrow(missingColumnPolicy, msg, e);
+ } catch (Exception e) {
+ throw new FlumeException("Failed to create Kudu operation", e);
+ }
+ }
+ ops.add(op);
+ }
+ if (!match) {
+ String msg = String.format("Failed to match the pattern '%s' in '%s'", pattern, raw);
+ logOrThrow(unmatchedRowPolicy, msg, null);
+ }
+ return ops;
+ }
+
+ /**
+ * Coerces the string `rawVal` to the type `type` and sets the resulting
+ * value for column `colName` in `row`.
+ *
+ * @param rawVal the raw string column value
+ * @param colName the name of the column
+ * @param type the Kudu type to convert `rawVal` to
+ * @param row the row to set the value in
+ * @throws NumberFormatException if `rawVal` cannot be cast as `type`.
+ */
+ private void coerceAndSet(String rawVal, String colName, Type type, PartialRow row)
+ throws NumberFormatException {
+ switch (type) {
+ case BOOL:
+ row.addBoolean(colName, Boolean.parseBoolean(rawVal));
+ break;
+ case INT8:
+ row.addByte(colName, Byte.parseByte(rawVal));
+ break;
+ case INT16:
+ row.addShort(colName, Short.parseShort(rawVal));
+ break;
+ case INT32:
+ row.addInt(colName, Integer.parseInt(rawVal));
+ break;
+ case INT64: // Fall through
+ case UNIXTIME_MICROS:
+ row.addLong(colName, Long.parseLong(rawVal));
+ break;
+ case FLOAT:
+ row.addFloat(colName, Float.parseFloat(rawVal));
+ break;
+ case DOUBLE:
+ row.addDouble(colName, Double.parseDouble(rawVal));
+ break;
+ case BINARY:
+ row.addBinary(colName, rawVal.getBytes(charset));
+ break;
+ case STRING:
+ row.addString(colName, rawVal);
+ break;
+ default:
+ logger.warn("got unknown type {} for column '{}'-- ignoring this column",
+ type, colName);
+ }
+ }
+
+ private void logOrThrow(ParseErrorPolicy policy, String msg, Exception e)
+ throws FlumeException {
+ switch (policy) {
+ case REJECT:
+ throw new FlumeException(msg, e);
+ case WARN:
+ logger.warn(msg, e);
+ break;
+ case IGNORE:
+ // Fall through
+ default:
+ }
+ }
+
+ @Override
+ public void close() {
+ }
+
+ private ParseErrorPolicy getParseErrorPolicyCheckingDeprecatedProperty(
+ Context context, String deprecatedPropertyName, String newPropertyName,
+ ParseErrorPolicy trueValue, ParseErrorPolicy falseValue, ParseErrorPolicy defaultValue) {
+ ParseErrorPolicy policy;
+ if (context.containsKey(deprecatedPropertyName)) {
+ logger.info("Configuration property {} is deprecated. Use {} instead.",
+ deprecatedPropertyName, newPropertyName);
+ Preconditions.checkArgument(!context.containsKey(newPropertyName),
+ "Both {} and {} specified. Use only one of them, preferably {}.",
+ deprecatedPropertyName, newPropertyName, newPropertyName);
+ policy = context.getBoolean(deprecatedPropertyName) ? trueValue : falseValue;
+ } else {
+ String policyString = context.getString(newPropertyName, defaultValue.name());
+ try {
+ policy = ParseErrorPolicy.valueOf(policyString.toUpperCase(Locale.ENGLISH));
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException(
+ "Unknown policy '" + policyString + "'. Use one of the following: " +
+ Arrays.toString(ParseErrorPolicy.values()), e);
+ }
+ }
+
+ return policy;
+ }
+
+ private enum ParseErrorPolicy {
+ WARN,
+ IGNORE,
+ REJECT
+ }
+}
diff --git a/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/SimpleKeyedKuduOperationsProducer.java b/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/SimpleKeyedKuduOperationsProducer.java
new file mode 100644
index 0000000..5287927
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/SimpleKeyedKuduOperationsProducer.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.sink.kudu;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.Upsert;
+
+/**
+ * A simple serializer that generates one {@link Insert} or {@link Upsert}
+ * per {@link Event} by writing the event body into a BINARY column. The pair
+ * (key column name, key column value) should be a header in the {@link Event};
+ * the column name is configurable but the column type must be STRING. Multiple
+ * key columns are not supported.
+ *
+ * <p><strong>Simple Keyed Kudu Operations Producer configuration parameters</strong>
+ *
+ * <table cellpadding=3 cellspacing=0 border=1
+ * summary="Simple Keyed Kudu Operations Producer configuration parameters">
+ * <tr>
+ * <th>Property Name</th>
+ * <th>Default</th>
+ * <th>Required?</th>
+ * <th>Description</th>
+ * </tr>
+ * <tr>
+ * <td>producer.payloadColumn</td>
+ * <td>payload</td>
+ * <td>No</td>
+ * <td>The name of the BINARY column to write the Flume event body to.</td>
+ * </tr>
+ * <tr>
+ * <td>producer.keyColumn</td>
+ * <td>key</td>
+ * <td>No</td>
+ * <td>The name of the STRING key column of the target Kudu table.</td>
+ * </tr>
+ * <tr>
+ * <td>producer.operation</td>
+ * <td>upsert</td>
+ * <td>No</td>
+ * <td>The operation used to write events to Kudu. Supported operations
+ * are 'insert' and 'upsert'</td>
+ * </tr>
+ * </table>
+ */
+public class SimpleKeyedKuduOperationsProducer implements KuduOperationsProducer {
+ public static final String PAYLOAD_COLUMN_PROP = "payloadColumn";
+ public static final String PAYLOAD_COLUMN_DEFAULT = "payload";
+ public static final String KEY_COLUMN_PROP = "keyColumn";
+ public static final String KEY_COLUMN_DEFAULT = "key";
+ public static final String OPERATION_PROP = "operation";
+ public static final String OPERATION_DEFAULT = "upsert";
+
+ private KuduTable table;
+ private String payloadColumn;
+ private String keyColumn;
+ private String operation = "";
+
+ public SimpleKeyedKuduOperationsProducer(){
+ }
+
+ @Override
+ public void configure(Context context) {
+ payloadColumn = context.getString(PAYLOAD_COLUMN_PROP, PAYLOAD_COLUMN_DEFAULT);
+ keyColumn = context.getString(KEY_COLUMN_PROP, KEY_COLUMN_DEFAULT);
+ operation = context.getString(OPERATION_PROP, OPERATION_DEFAULT);
+ }
+
+ @Override
+ public void initialize(KuduTable table) {
+ this.table = table;
+ }
+
+ @Override
+ public List<Operation> getOperations(Event event) throws FlumeException {
+ String key = event.getHeaders().get(keyColumn);
+ if (key == null) {
+ throw new FlumeException(
+ String.format("No value provided for key column %s", keyColumn));
+ }
+ try {
+ Operation op;
+ switch (operation.toLowerCase(Locale.ENGLISH)) {
+ case "upsert":
+ op = table.newUpsert();
+ break;
+ case "insert":
+ op = table.newInsert();
+ break;
+ default:
+ throw new FlumeException(
+ String.format("Unexpected operation %s", operation));
+ }
+ PartialRow row = op.getRow();
+ row.addString(keyColumn, key);
+ row.addBinary(payloadColumn, event.getBody());
+
+ return Collections.singletonList(op);
+ } catch (Exception e) {
+ throw new FlumeException("Failed to create Kudu Operation object", e);
+ }
+ }
+
+ @Override
+ public void close() {
+ }
+}
+
diff --git a/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/SimpleKuduOperationsProducer.java b/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/SimpleKuduOperationsProducer.java
new file mode 100644
index 0000000..70c91b8
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kudu-sink/src/main/java/org/apache/flume/sink/kudu/SimpleKuduOperationsProducer.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.sink.kudu;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.PartialRow;
+
+/**
+ * A simple serializer that generates one {@link Insert} per {@link Event}
+ * by writing the event body into a BINARY column. The headers are discarded.
+ *
+ * <p><strong>Simple Kudu Event Producer configuration parameters</strong>
+ *
+ * <table cellpadding=3 cellspacing=0 border=1
+ * summary="Simple Kudu Event Producer configuration parameters">
+ * <tr>
+ * <th>Property Name</th>
+ * <th>Default</th>
+ * <th>Required?</th>
+ * <th>Description</th>
+ * </tr>
+ * <tr>
+ * <td>producer.payloadColumn</td>
+ * <td>payload</td>
+ * <td>No</td>
+ * <td>The name of the BINARY column to write the Flume the event body to.</td>
+ * </tr>
+ * </table>
+ */
+public class SimpleKuduOperationsProducer implements KuduOperationsProducer {
+ public static final String PAYLOAD_COLUMN_PROP = "payloadColumn";
+ public static final String PAYLOAD_COLUMN_DEFAULT = "payload";
+
+ private KuduTable table;
+ private String payloadColumn;
+
+ public SimpleKuduOperationsProducer() {
+ }
+
+ @Override
+ public void configure(Context context) {
+ payloadColumn = context.getString(PAYLOAD_COLUMN_PROP, PAYLOAD_COLUMN_DEFAULT);
+ }
+
+ @Override
+ public void initialize(KuduTable table) {
+ this.table = table;
+ }
+
+ @Override
+ public List<Operation> getOperations(Event event) throws FlumeException {
+ try {
+ Insert insert = table.newInsert();
+ PartialRow row = insert.getRow();
+ row.addBinary(payloadColumn, event.getBody());
+
+ return Collections.singletonList((Operation) insert);
+ } catch (Exception e) {
+ throw new FlumeException("Failed to create Kudu Insert object", e);
+ }
+ }
+
+ @Override
+ public void close() {
+ }
+}
diff --git a/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/KuduSinkTestUtil.java b/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/KuduSinkTestUtil.java
new file mode 100644
index 0000000..253657e
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/KuduSinkTestUtil.java
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.flume.sink.kudu;
+
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+
+import java.util.List;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Sink.Status;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.client.KuduClient;
+
+class KuduSinkTestUtil {
+ private static final Logger LOG = LoggerFactory.getLogger(KuduSinkTestUtil.class);
+
+ static KuduSink createSink(KuduClient client, String tableName, Context ctx) {
+ return createSink(tableName, client, ctx, client.getMasterAddressesAsString());
+ }
+
+ private static KuduSink createSink(
+ String tableName, KuduClient client, Context ctx, String masterAddresses) {
+ LOG.info("Creating Kudu sink for '{}' table...", tableName);
+
+ Context context = new Context();
+ context.put(KuduSinkConfigurationConstants.TABLE_NAME, tableName);
+ context.put(KuduSinkConfigurationConstants.MASTER_ADDRESSES, masterAddresses);
+ context.putAll(ctx.getParameters());
+ KuduSink sink = new KuduSink(client);
+ Configurables.configure(sink, context);
+ Channel channel = new MemoryChannel();
+ Configurables.configure(channel, new Context());
+ sink.setChannel(channel);
+
+ LOG.info("Created Kudu sink for '{}' table.", tableName);
+
+ return sink;
+ }
+
+ static KuduSink createSecureSink(String tableName, String masterAddresses, String clusterRoot) {
+ Context context = new Context();
+ context.put(KuduSinkConfigurationConstants.KERBEROS_KEYTAB, clusterRoot +
+ "/krb5kdc/test-user.keytab");
+ context.put(KuduSinkConfigurationConstants.KERBEROS_PRINCIPAL, "test-user@KRBTEST.COM");
+
+ return createSink(tableName, null, context, masterAddresses);
+ }
+
+ static void processEventsCreatingSink(
+ KuduClient syncClient, Context context, String tableName, List<Event> events)
+ throws EventDeliveryException {
+ KuduSink sink = createSink(syncClient, tableName, context);
+ sink.start();
+ processEvents(sink, events);
+ }
+
+ static void processEvents(KuduSink sink, List<Event> events) throws EventDeliveryException {
+ Channel channel = sink.getChannel();
+ Transaction tx = channel.getTransaction();
+ tx.begin();
+ for (Event e : events) {
+ channel.put(e);
+ }
+ tx.commit();
+ tx.close();
+
+ Status status = sink.process();
+ if (events.isEmpty()) {
+ assertSame("incorrect status for empty channel", status, Status.BACKOFF);
+ } else {
+ assertNotSame("incorrect status for non-empty channel", status, Status.BACKOFF);
+ }
+ }
+}
diff --git a/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/TestAvroKuduOperationsProducer.java b/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/TestAvroKuduOperationsProducer.java
new file mode 100644
index 0000000..04cc676
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/TestAvroKuduOperationsProducer.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.sink.kudu;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.kudu.test.ClientTestUtil.scanTableToStrings;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.test.KuduTestHarness;
+
+public class TestAvroKuduOperationsProducer {
+ private static String schemaUriString;
+ private static String schemaLiteral;
+
+ static {
+ try {
+ String schemaPath = "/testAvroKuduOperationsProducer.avsc";
+ URL schemaUrl = TestAvroKuduOperationsProducer.class.getResource(schemaPath);
+ File schemaFile = Paths.get(schemaUrl.toURI()).toFile();
+ schemaUriString = schemaFile.getAbsoluteFile().toURI().toString();
+ schemaLiteral = Files.toString(schemaFile, UTF_8);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ enum SchemaLocation {
+ GLOBAL, URL, LITERAL
+ }
+
+ @Rule
+ public KuduTestHarness harness = new KuduTestHarness();
+
+ @Test
+ public void testEmptyChannel() throws Exception {
+ testEvents(0, SchemaLocation.GLOBAL);
+ }
+
+ @Test
+ public void testOneEvent() throws Exception {
+ testEvents(1, SchemaLocation.GLOBAL);
+ }
+
+ @Test
+ public void testThreeEvents() throws Exception {
+ testEvents(3, SchemaLocation.GLOBAL);
+ }
+
+ @Test
+ public void testThreeEventsSchemaURLInEvent() throws Exception {
+ testEvents(3, SchemaLocation.URL);
+ }
+
+ @Test
+ public void testThreeEventsSchemaLiteralInEvent() throws Exception {
+ testEvents(3, SchemaLocation.LITERAL);
+ }
+
+ private void testEvents(int eventCount, SchemaLocation schemaLocation)
+ throws Exception {
+ KuduTable table = createNewTable(
+ String.format("test%sevents%s", eventCount, schemaLocation));
+ String tableName = table.getName();
+ Context context = schemaLocation != SchemaLocation.GLOBAL ? new Context()
+ : new Context(ImmutableMap.of(KuduSinkConfigurationConstants.PRODUCER_PREFIX +
+ AvroKuduOperationsProducer.SCHEMA_PROP, schemaUriString));
+ context.put(KuduSinkConfigurationConstants.PRODUCER,
+ AvroKuduOperationsProducer.class.getName());
+
+ List<Event> events = generateEvents(eventCount, schemaLocation);
+
+ KuduSinkTestUtil.processEventsCreatingSink(harness.getClient(), context, tableName, events);
+
+ List<String> answers = makeAnswers(eventCount);
+ List<String> rows = scanTableToStrings(table);
+ assertEquals("wrong number of rows inserted", answers.size(), rows.size());
+ assertArrayEquals("wrong rows inserted", answers.toArray(), rows.toArray());
+ }
+
+ private KuduTable createNewTable(String tableName) throws Exception {
+ List<ColumnSchema> columns = new ArrayList<>(5);
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("longField", Type.INT64).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("doubleField", Type.DOUBLE).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("nullableField", Type.STRING)
+ .nullable(true).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("stringField", Type.STRING).build());
+ CreateTableOptions createOptions =
+ new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("key"))
+ .setNumReplicas(1);
+ return harness.getClient().createTable(tableName, new Schema(columns), createOptions);
+ }
+
+ private List<Event> generateEvents(int eventCount,
+ SchemaLocation schemaLocation) throws Exception {
+ List<Event> events = new ArrayList<>();
+ for (int i = 0; i < eventCount; i++) {
+ AvroKuduOperationsProducerTestRecord record = new AvroKuduOperationsProducerTestRecord();
+ record.setKey(10 * i);
+ record.setLongField(2L * i);
+ record.setDoubleField(2.71828 * i);
+ record.setNullableField(i % 2 == 0 ? null : "taco");
+ record.setStringField(String.format("hello %d", i));
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
+ DatumWriter<AvroKuduOperationsProducerTestRecord> writer =
+ new SpecificDatumWriter<>(AvroKuduOperationsProducerTestRecord.class);
+ writer.write(record, encoder);
+ encoder.flush();
+ Event e = EventBuilder.withBody(out.toByteArray());
+ if (schemaLocation == SchemaLocation.URL) {
+ e.setHeaders(ImmutableMap.of(AvroKuduOperationsProducer.SCHEMA_URL_HEADER,
+ schemaUriString));
+ } else if (schemaLocation == SchemaLocation.LITERAL) {
+ e.setHeaders(ImmutableMap.of(AvroKuduOperationsProducer.SCHEMA_LITERAL_HEADER,
+ schemaLiteral));
+ }
+ events.add(e);
+ }
+ return events;
+ }
+
+ private List<String> makeAnswers(int eventCount) {
+ List<String> answers = Lists.newArrayList();
+ for (int i = 0; i < eventCount; i++) {
+ answers.add(String.format(
+ "INT32 key=%s, INT64 longField=%s, DOUBLE doubleField=%s, " +
+ "STRING nullableField=%s, STRING stringField=hello %s",
+ 10 * i,
+ 2 * i,
+ 2.71828 * i,
+ i % 2 == 0 ? "NULL" : "taco",
+ i));
+ }
+ Collections.sort(answers);
+ return answers;
+ }
+}
diff --git a/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/TestKeyedKuduOperationsProducer.java b/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/TestKeyedKuduOperationsProducer.java
new file mode 100644
index 0000000..a7cccf5
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/TestKeyedKuduOperationsProducer.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.kudu;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.kudu.test.ClientTestUtil.scanTableToStrings;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.test.KuduTestHarness;
+
+public class TestKeyedKuduOperationsProducer {
+ private static final Logger LOG = LoggerFactory.getLogger(TestKeyedKuduOperationsProducer.class);
+
+ @Rule
+ public KuduTestHarness harness = new KuduTestHarness();
+
+ private KuduTable createNewTable(String tableName) throws Exception {
+ LOG.info("Creating new table...");
+
+ ArrayList<ColumnSchema> columns = new ArrayList<>(2);
+ columns.add(
+ new ColumnSchema.ColumnSchemaBuilder(
+ SimpleKeyedKuduOperationsProducer.KEY_COLUMN_DEFAULT, Type.STRING)
+ .key(true).build());
+ columns.add(
+ new ColumnSchema.ColumnSchemaBuilder(
+ SimpleKeyedKuduOperationsProducer.PAYLOAD_COLUMN_DEFAULT, Type.BINARY)
+ .key(false).build());
+ CreateTableOptions createOptions =
+ new CreateTableOptions()
+ .setRangePartitionColumns(ImmutableList.of(
+ SimpleKeyedKuduOperationsProducer.KEY_COLUMN_DEFAULT))
+ .setNumReplicas(1);
+ KuduTable table =
+ harness.getClient().createTable(tableName, new Schema(columns), createOptions);
+
+ LOG.info("Created new table.");
+
+ return table;
+ }
+
+ @Test
+ public void testEmptyChannelWithInsert() throws Exception {
+ testEvents(0, "insert");
+ }
+
+ @Test
+ public void testOneEventWithInsert() throws Exception {
+ testEvents(1, "insert");
+ }
+
+ @Test
+ public void testThreeEventsWithInsert() throws Exception {
+ testEvents(3, "insert");
+ }
+
+ @Test
+ public void testEmptyChannelWithUpsert() throws Exception {
+ testEvents(0, "upsert");
+ }
+
+ @Test
+ public void testOneEventWithUpsert() throws Exception {
+ testEvents(1, "upsert");
+ }
+
+ @Test
+ public void testThreeEventsWithUpsert() throws Exception {
+ testEvents(3, "upsert");
+ }
+
+ @Test
+ public void testDuplicateRowsWithUpsert() throws Exception {
+ LOG.info("Testing events with upsert...");
+
+ KuduTable table = createNewTable("testDupUpsertEvents");
+ String tableName = table.getName();
+ Context ctx = new Context(ImmutableMap.of(
+ KuduSinkConfigurationConstants.PRODUCER_PREFIX +
+ SimpleKeyedKuduOperationsProducer.OPERATION_PROP, "upsert",
+ KuduSinkConfigurationConstants.PRODUCER, SimpleKeyedKuduOperationsProducer.class.getName()
+ ));
+ KuduSink sink = KuduSinkTestUtil.createSink(harness.getClient(), tableName, ctx);
+ sink.start();
+
+ int numRows = 3;
+ List<Event> events = new ArrayList<>();
+ for (int i = 0; i < numRows; i++) {
+ Event e = EventBuilder.withBody(String.format("payload body %s", i), UTF_8);
+ e.setHeaders(ImmutableMap.of(SimpleKeyedKuduOperationsProducer.KEY_COLUMN_DEFAULT,
+ String.format("key %s", i)));
+ events.add(e);
+ }
+
+ KuduSinkTestUtil.processEvents(sink, events);
+
+ List<String> rows = scanTableToStrings(table);
+ assertEquals(numRows + " row(s) expected", numRows, rows.size());
+
+ for (int i = 0; i < numRows; i++) {
+ assertTrue("incorrect payload", rows.get(i).contains("payload body " + i));
+ }
+
+ Event dup = EventBuilder.withBody("payload body upserted".getBytes(UTF_8));
+ dup.setHeaders(ImmutableMap.of("key", String.format("key %s", 0)));
+
+ KuduSinkTestUtil.processEvents(sink, ImmutableList.of(dup));
+
+ List<String> upRows = scanTableToStrings(table);
+ assertEquals(numRows + " row(s) expected", numRows, upRows.size());
+
+ assertTrue("incorrect payload", upRows.get(0).contains("payload body upserted"));
+ for (int i = 1; i < numRows; i++) {
+ assertTrue("incorrect payload", upRows.get(i).contains("payload body " + i));
+ }
+
+ LOG.info("Testing events with upsert finished successfully.");
+ }
+
+ private void testEvents(int eventCount, String operation) throws Exception {
+ LOG.info("Testing {} events...", eventCount);
+
+ KuduTable table = createNewTable("test" + eventCount + "events" + operation);
+ String tableName = table.getName();
+ Context context = new Context(ImmutableMap.of(
+ KuduSinkConfigurationConstants.PRODUCER_PREFIX +
+ SimpleKeyedKuduOperationsProducer.OPERATION_PROP, operation,
+ KuduSinkConfigurationConstants.PRODUCER, SimpleKeyedKuduOperationsProducer.class.getName()
+ ));
+
+ List<Event> events = getEvents(eventCount);
+
+ KuduSinkTestUtil.processEventsCreatingSink(harness.getClient(), context, tableName, events);
+
+ List<String> rows = scanTableToStrings(table);
+ assertEquals(eventCount + " row(s) expected", eventCount, rows.size());
+
+ for (int i = 0; i < eventCount; i++) {
+ assertTrue("incorrect payload", rows.get(i).contains("payload body " + i));
+ }
+
+ LOG.info("Testing {} events finished successfully.", eventCount);
+ }
+
+ private List<Event> getEvents(int eventCount) {
+ List<Event> events = new ArrayList<>();
+ for (int i = 0; i < eventCount; i++) {
+ Event e = EventBuilder.withBody(String.format("payload body %s", i).getBytes(UTF_8));
+ e.setHeaders(ImmutableMap.of("key", String.format("key %s", i)));
+ events.add(e);
+ }
+ return events;
+ }
+}
diff --git a/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/TestKuduSink.java b/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/TestKuduSink.java
new file mode 100644
index 0000000..c242262
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/TestKuduSink.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.kudu;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.kudu.test.ClientTestUtil.scanTableToStrings;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.Sink;
+import org.apache.flume.Sink.Status;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.apache.kudu.test.KuduTestHarness;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduTable;
+
+public class TestKuduSink {
+ private static final Logger LOG = LoggerFactory.getLogger(TestKuduSink.class);
+
+ @Rule
+ public KuduTestHarness harness = new KuduTestHarness();
+
+ private KuduTable createNewTable(String tableName) throws Exception {
+ LOG.info("Creating new table...");
+
+ ArrayList<ColumnSchema> columns = new ArrayList<>(1);
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("payload", Type.BINARY).key(true).build());
+ CreateTableOptions createOptions =
+ new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("payload"))
+ .setNumReplicas(1);
+ KuduTable table = harness.getClient().createTable(tableName, new Schema(columns),
+ createOptions);
+
+ LOG.info("Created new table.");
+
+ return table;
+ }
+
+ @Test
+ public void testMandatoryParameters() {
+ LOG.info("Testing mandatory parameters...");
+
+ KuduSink sink = new KuduSink(harness.getClient());
+
+ HashMap<String, String> parameters = new HashMap<>();
+ Context context = new Context(parameters);
+ try {
+ Configurables.configure(sink, context);
+ Assert.fail("Should have failed due to missing properties");
+ } catch (NullPointerException npe) {
+ //good
+ }
+
+ parameters.put(KuduSinkConfigurationConstants.TABLE_NAME, "tableName");
+ context = new Context(parameters);
+ try {
+ Configurables.configure(sink, context);
+ Assert.fail("Should have failed due to missing properties");
+ } catch (NullPointerException npe) {
+ //good
+ }
+
+ LOG.info("Testing mandatory parameters finished successfully.");
+ }
+
+ @Test(expected = FlumeException.class)
+ public void testMissingTable() {
+ LOG.info("Testing missing table...");
+
+ KuduSink sink = KuduSinkTestUtil.createSink(harness.getClient(), "missingTable",
+ new Context());
+ sink.start();
+
+ LOG.info("Testing missing table finished successfully.");
+ }
+
+ @Test
+ public void testEmptyChannelWithDefaults() throws Exception {
+ testEventsWithDefaults(0);
+ }
+
+ @Test
+ public void testOneEventWithDefaults() throws Exception {
+ testEventsWithDefaults(1);
+ }
+
+ @Test
+ public void testThreeEventsWithDefaults() throws Exception {
+ testEventsWithDefaults(3);
+ }
+
+ @Test
+ public void testDuplicateRowsWithDuplicatesIgnored() throws Exception {
+ doTestDuplicateRows(true);
+ }
+
+ @Test
+ public void testDuplicateRowsWithDuplicatesNotIgnored() throws Exception {
+ doTestDuplicateRows(false);
+ }
+
+ private void doTestDuplicateRows(boolean ignoreDuplicateRows) throws Exception {
+ KuduTable table = createNewTable("testDuplicateRows" + ignoreDuplicateRows);
+ String tableName = table.getName();
+ Context sinkContext = new Context();
+ sinkContext.put(KuduSinkConfigurationConstants.IGNORE_DUPLICATE_ROWS,
+ Boolean.toString(ignoreDuplicateRows));
+ KuduSink sink = KuduSinkTestUtil.createSink(harness.getClient(), tableName, sinkContext);
+ sink.start();
+ Channel channel = sink.getChannel();
+
+ Transaction tx = channel.getTransaction();
+ tx.begin();
+
+ for (int i = 0; i < 2; i++) {
+ Event e = EventBuilder.withBody("key-0", UTF_8); // Duplicate keys.
+ channel.put(e);
+ }
+
+ tx.commit();
+ tx.close();
+
+ try {
+ Sink.Status status = sink.process();
+ if (!ignoreDuplicateRows) {
+ fail("Incorrectly ignored duplicate rows!");
+ }
+ assertSame("incorrect status for empty channel", status, Status.READY);
+ } catch (EventDeliveryException e) {
+ if (ignoreDuplicateRows) {
+ throw new AssertionError("Failed to ignore duplicate rows!", e);
+ } else {
+ LOG.info("Correctly did not ignore duplicate rows", e);
+ return;
+ }
+ }
+
+ // We only get here if the process() succeeded.
+ try {
+ List<String> rows = scanTableToStrings(table);
+ assertEquals("1 row expected", 1, rows.size());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ LOG.info("Testing duplicate events finished successfully.");
+ }
+
+ private void testEventsWithDefaults(int eventCount) throws Exception {
+ LOG.info("Testing {} events...", eventCount);
+
+ KuduTable table = createNewTable("test" + eventCount + "events");
+ String tableName = table.getName();
+
+ List<Event> events = new ArrayList<>();
+
+ for (int i = 0; i < eventCount; i++) {
+ Event e = EventBuilder.withBody(String.format("payload body %s", i).getBytes(UTF_8));
+ events.add(e);
+ }
+
+ KuduSinkTestUtil.processEventsCreatingSink(harness.getClient(), new Context(), tableName,
+ events);
+
+ List<String> rows = scanTableToStrings(table);
+ assertEquals(eventCount + " row(s) expected", eventCount, rows.size());
+
+ for (int i = 0; i < eventCount; i++) {
+ assertTrue("incorrect payload", rows.get(i).contains("payload body " + i));
+ }
+
+ LOG.info("Testing {} events finished successfully.", eventCount);
+ }
+}
diff --git a/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/TestRegexpKuduOperationsProducer.java b/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/TestRegexpKuduOperationsProducer.java
new file mode 100644
index 0000000..f8e9c41
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/TestRegexpKuduOperationsProducer.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.kudu;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.kudu.test.ClientTestUtil.scanTableToStrings;
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+import org.apache.kudu.test.KuduTestHarness;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduTable;
+
+public class TestRegexpKuduOperationsProducer {
+ private static final String TEST_REGEXP =
+ "(?<key>\\d+),(?<byteFld>\\d+),(?<shortFld>\\d+),(?<intFld>\\d+)," +
+ "(?<longFld>\\d+),(?<binaryFld>\\w+),(?<stringFld>\\w+),(?<boolFld>\\w+)," +
+ "(?<floatFld>\\d+\\.\\d*),(?<doubleFld>\\d+.\\d*)";
+
+ @Rule
+ public KuduTestHarness harness = new KuduTestHarness();
+
+ private KuduTable createNewTable(String tableName) throws Exception {
+ ArrayList<ColumnSchema> columns = new ArrayList<>(10);
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("byteFld", Type.INT8).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("shortFld", Type.INT16).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("intFld", Type.INT32).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("longFld", Type.INT64).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("binaryFld", Type.BINARY).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("stringFld", Type.STRING).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("boolFld", Type.BOOL).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("floatFld", Type.FLOAT).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("doubleFld", Type.DOUBLE).build());
+ CreateTableOptions createOptions =
+ new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 3).setNumReplicas(1);
+ return harness.getClient().createTable(tableName, new Schema(columns), createOptions);
+ }
+
+ @Test
+ public void testEmptyChannel() throws Exception {
+ testEvents(0, 1, "insert");
+ }
+
+ @Test
+ public void testOneEvent() throws Exception {
+ testEvents(1, 1, "insert");
+ }
+
+ @Test
+ public void testThreeEvents() throws Exception {
+ testEvents(3, 1, "insert");
+ }
+
+ @Test
+ public void testThreeEventsWithUpsert() throws Exception {
+ testEvents(3, 1, "upsert");
+ }
+
+ @Test
+ public void testOneEventTwoRowsEach() throws Exception {
+ testEvents(1, 2, "insert");
+ }
+
+ @Test
+ public void testTwoEventsTwoRowsEach() throws Exception {
+ testEvents(2, 2, "insert");
+ }
+
+ @Test
+ public void testTwoEventsTwoRowsEachWithUpsert() throws Exception {
+ testEvents(2, 2, "upsert");
+ }
+
+ private void testEvents(int eventCount, int perEventRowCount, String operation) throws Exception {
+ String tableName = String.format("test%sevents%srowseach%s",
+ eventCount, perEventRowCount, operation);
+ Context context = new Context();
+ context.put(KuduSinkConfigurationConstants.PRODUCER_PREFIX +
+ RegexpKuduOperationsProducer.PATTERN_PROP, TEST_REGEXP);
+ context.put(KuduSinkConfigurationConstants.PRODUCER_PREFIX +
+ RegexpKuduOperationsProducer.OPERATION_PROP, operation);
+ context.put(KuduSinkConfigurationConstants.PRODUCER,
+ RegexpKuduOperationsProducer.class.getName());
+ KuduTable table = createNewTable(tableName);
+
+ List<Event> events = generateEvents(eventCount, perEventRowCount, operation);
+
+ KuduSinkTestUtil.processEventsCreatingSink(harness.getClient(), context, tableName, events);
+
+ List<String> rows = scanTableToStrings(table);
+ assertEquals(eventCount * perEventRowCount + " row(s) expected",
+ eventCount * perEventRowCount,
+ rows.size());
+
+ ArrayList<String> rightAnswers = new ArrayList<>(eventCount * perEventRowCount);
+ for (int i = 0; i < eventCount; i++) {
+ for (int j = 0; j < perEventRowCount; j++) {
+ int value = operation.equals("upsert") && i == 0 ? 1 : i;
+ String baseAnswer = "INT32 key=1%2$d%3$d1, INT8 byteFld=%1$d, INT16 shortFld=%1$d, " +
+ "INT32 intFld=%1$d, INT64 longFld=%1$d, BINARY binaryFld=\"binary\", " +
+ "STRING stringFld=string, BOOL boolFld=false, FLOAT floatFld=%1$d.%1$d, " +
+ "DOUBLE doubleFld=%1$d.%1$d";
+ String rightAnswer = String.format(baseAnswer, value, i, j);
+ rightAnswers.add(rightAnswer);
+ }
+ }
+ Collections.sort(rightAnswers);
+
+ for (int k = 0; k < eventCount * perEventRowCount; k++) {
+ assertEquals("incorrect row", rightAnswers.get(k), rows.get(k));
+ }
+ }
+
+ private List<Event> generateEvents(int eventCount, int perEventRowCount, String operation) {
+ List<Event> events = new ArrayList<>();
+
+ for (int i = 0; i < eventCount; i++) {
+ StringBuilder payload = new StringBuilder();
+ for (int j = 0; j < perEventRowCount; j++) {
+ String baseRow = "|1%1$d%2$d1,%1$d,%1$d,%1$d,%1$d,binary," +
+ "string,false,%1$d.%1$d,%1$d.%1$d,%1$d.%1$d|";
+ String row = String.format(baseRow, i, j);
+ payload.append(row);
+ }
+ Event e = EventBuilder.withBody(payload.toString().getBytes(UTF_8));
+ events.add(e);
+ }
+
+ if (eventCount > 0) {
+ // In the upsert case, add one upsert row per insert event (i.e. per i)
+ // All such rows go in one event.
+ if (operation.equals("upsert")) {
+ StringBuilder upserts = new StringBuilder();
+ for (int j = 0; j < perEventRowCount; j++) {
+ String row = String.format("|1%2$d%3$d1,%1$d,%1$d,%1$d,%1$d,binary," +
+ "string,false,%1$d.%1$d,%1$d.%1$d,%1$d.%1$d|", 1, 0, j);
+ upserts.append(row);
+ }
+ Event e = EventBuilder.withBody(upserts.toString().getBytes(UTF_8));
+ events.add(e);
+ }
+
+ // Also check some bad/corner cases.
+ String mismatchInInt = "|1,2,taco,4,5,x,y,true,1.0.2.0,999|";
+ String emptyString = "";
+ String[] testCases = {mismatchInInt, emptyString};
+ for (String testCase : testCases) {
+ Event e = EventBuilder.withBody(testCase.getBytes(UTF_8));
+ events.add(e);
+ }
+ }
+ return events;
+ }
+}
diff --git a/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/TestRegexpKuduOperationsProducerParseError.java b/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/TestRegexpKuduOperationsProducerParseError.java
new file mode 100644
index 0000000..e4af76f
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/TestRegexpKuduOperationsProducerParseError.java
@@ -0,0 +1,291 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.flume.sink.kudu;
+
+import static org.apache.flume.sink.kudu.RegexpKuduOperationsProducer.*;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.Closeable;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.flume.Context;
+import org.apache.flume.FlumeException;
+import org.apache.flume.event.EventBuilder;
+import org.apache.kudu.test.KuduTestHarness;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.RuleChain;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.test.CapturingLogAppender;
+
+public class TestRegexpKuduOperationsProducerParseError {
+ private static final String TEST_REGEXP = "(?<key>\\d+),(?<byteFld>\\d+),(?<stringFld>\\w+)";
+ private static final String TEST_REGEXP_MISSING_COLUMN = "(?<key>\\d+),(?<byteFld>\\d+)";
+ private static final String TEST_OPERATION = "insert";
+
+ private static final String ROW_UNMATCHING = "invalid row";
+ private static final String ROW_BAD_COLUMN_VALUE = "1,1000,string";
+ private static final String ROW_MISSING_COLUMN = "1,1";
+
+ private static final String ERROR_MSG_UNMATCHED_ROW =
+ "Failed to match the pattern '" + TEST_REGEXP + "' in '" + ROW_UNMATCHING + "'";
+ private static final String ERROR_MSG_MISSING_COLUMN =
+ "Column 'stringFld' has no matching group in '" + ROW_MISSING_COLUMN + "'";
+ private static final String ERROR_MSG_BAD_COLUMN_VALUE =
+ "Raw value '" + ROW_BAD_COLUMN_VALUE +
+ "' couldn't be parsed to type Type: int8 for column 'byteFld'";
+
+ private static final String POLICY_REJECT = "REJECT";
+ private static final String POLICY_WARN = "WARN";
+ private static final String POLICY_IGNORE = "IGNORE";
+
+ public KuduTestHarness harness = new KuduTestHarness();
+ public ExpectedException thrown = ExpectedException.none();
+
+ // ExpectedException misbehaves when combined with other rules; we use a
+ // RuleChain to beat it into submission.
+ //
+ // See https://stackoverflow.com/q/28846088 for more information.
+ @Rule
+ public RuleChain chain = RuleChain.outerRule(harness).around(thrown);
+
+ @Test
+ public void testMissingColumnThrowsExceptionDefaultConfig() throws Exception {
+ Context additionalContext = new Context();
+ additionalContext.put(PATTERN_PROP, TEST_REGEXP_MISSING_COLUMN);
+ testThrowsException(additionalContext, ERROR_MSG_MISSING_COLUMN, ROW_MISSING_COLUMN);
+ }
+
+ @Test
+ public void testMissingColumnThrowsExceptionDeprecated() throws Exception {
+ Context additionalContext = new Context();
+ additionalContext.put(PATTERN_PROP, TEST_REGEXP_MISSING_COLUMN);
+ additionalContext.put(SKIP_MISSING_COLUMN_PROP, String.valueOf(false));
+ testThrowsException(additionalContext, ERROR_MSG_MISSING_COLUMN, ROW_MISSING_COLUMN);
+ }
+
+ @Test
+ public void testMissingColumnThrowsException() throws Exception {
+ Context additionalContext = new Context();
+ additionalContext.put(PATTERN_PROP, TEST_REGEXP_MISSING_COLUMN);
+ additionalContext.put(MISSING_COLUMN_POLICY_PROP, POLICY_REJECT);
+ testThrowsException(additionalContext, ERROR_MSG_MISSING_COLUMN, ROW_MISSING_COLUMN);
+ }
+
+ @Test
+ public void testMissingColumnLogsWarningDeprecated() throws Exception {
+ Context additionalContext = new Context();
+ additionalContext.put(PATTERN_PROP, TEST_REGEXP_MISSING_COLUMN);
+ additionalContext.put(SKIP_MISSING_COLUMN_PROP, String.valueOf(true));
+ testLogging(additionalContext, ERROR_MSG_MISSING_COLUMN, ROW_MISSING_COLUMN);
+ }
+
+ @Test
+ public void testMissingColumnLogsWarning() throws Exception {
+ Context additionalContext = new Context();
+ additionalContext.put(PATTERN_PROP, TEST_REGEXP_MISSING_COLUMN);
+ additionalContext.put(MISSING_COLUMN_POLICY_PROP, POLICY_WARN);
+ testLogging(additionalContext, ERROR_MSG_MISSING_COLUMN, ROW_MISSING_COLUMN);
+ }
+
+
+ @Test
+ public void testMissingColumnIgnored() throws Exception {
+ Context additionalContext = new Context();
+ additionalContext.put(PATTERN_PROP, TEST_REGEXP_MISSING_COLUMN);
+ additionalContext.put(MISSING_COLUMN_POLICY_PROP, POLICY_IGNORE);
+ testIgnored(additionalContext, ERROR_MSG_MISSING_COLUMN, ROW_MISSING_COLUMN);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testMissingColumnConfigValidation() throws Exception {
+ Context additionalContext = new Context();
+ additionalContext.put(SKIP_MISSING_COLUMN_PROP, String.valueOf(false));
+ additionalContext.put(MISSING_COLUMN_POLICY_PROP, POLICY_IGNORE);
+ getProducer(additionalContext);
+ }
+
+ @Test
+ public void testBadColumnValueThrowsExceptionDefaultConfig() throws Exception {
+ Context additionalContext = new Context();
+ testThrowsException(additionalContext, ERROR_MSG_BAD_COLUMN_VALUE, ROW_BAD_COLUMN_VALUE);
+ }
+
+ @Test
+ public void testBadColumnValueThrowsExceptionDeprecated() throws Exception {
+ Context additionalContext = new Context();
+ additionalContext.put(SKIP_BAD_COLUMN_VALUE_PROP, String.valueOf(false));
+ testThrowsException(additionalContext, ERROR_MSG_BAD_COLUMN_VALUE, ROW_BAD_COLUMN_VALUE);
+ }
+
+ @Test
+ public void testBadColumnValueThrowsException() throws Exception {
+ Context additionalContext = new Context();
+ additionalContext.put(BAD_COLUMN_VALUE_POLICY_PROP, POLICY_REJECT);
+ testThrowsException(additionalContext, ERROR_MSG_BAD_COLUMN_VALUE, ROW_BAD_COLUMN_VALUE);
+ }
+
+ @Test
+ public void testBadColumnValueLogsWarningDeprecated() throws Exception {
+ Context additionalContext = new Context();
+ additionalContext.put(SKIP_BAD_COLUMN_VALUE_PROP, String.valueOf(true));
+ testLogging(additionalContext, ERROR_MSG_BAD_COLUMN_VALUE, ROW_BAD_COLUMN_VALUE);
+ }
+
+ @Test
+ public void testBadColumnValueLogsWarning() throws Exception {
+ Context additionalContext = new Context();
+ additionalContext.put(BAD_COLUMN_VALUE_POLICY_PROP, POLICY_WARN);
+ testLogging(additionalContext, ERROR_MSG_BAD_COLUMN_VALUE, ROW_BAD_COLUMN_VALUE);
+ }
+
+ @Test
+ public void testBadColumnValueIgnored() throws Exception {
+ Context additionalContext = new Context();
+ additionalContext.put(BAD_COLUMN_VALUE_POLICY_PROP, POLICY_IGNORE);
+ testIgnored(additionalContext, ERROR_MSG_BAD_COLUMN_VALUE, ROW_BAD_COLUMN_VALUE);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBadColumnValueConfigValidation() throws Exception {
+ Context additionalContext = new Context();
+ additionalContext.put(SKIP_BAD_COLUMN_VALUE_PROP, String.valueOf(false));
+ additionalContext.put(BAD_COLUMN_VALUE_POLICY_PROP, POLICY_IGNORE);
+ getProducer(additionalContext);
+ }
+
+ @Test
+ public void testUnmatchedRowLogsWarningWithDefaultConfig() throws Exception {
+ Context additionalContext = new Context();
+ testLogging(additionalContext, ERROR_MSG_UNMATCHED_ROW, ROW_UNMATCHING);
+ }
+
+ @Test
+ public void testUnmatchedRowThrowsException() throws Exception {
+ Context additionalContext = new Context();
+ additionalContext.put(UNMATCHED_ROW_POLICY_PROP, POLICY_REJECT);
+ testThrowsException(additionalContext, ERROR_MSG_UNMATCHED_ROW, ROW_UNMATCHING);
+ }
+
+ @Test
+ public void testUnmatchedRowLogsWarningDeprecated() throws Exception {
+ Context additionalContext = new Context();
+ additionalContext.put(WARN_UNMATCHED_ROWS_PROP, String.valueOf(true));
+ testLogging(additionalContext, ERROR_MSG_UNMATCHED_ROW, ROW_UNMATCHING);
+ }
+
+ @Test
+ public void testUnmatchedRowLogsWarning() throws Exception {
+ Context additionalContext = new Context();
+ additionalContext.put(UNMATCHED_ROW_POLICY_PROP, POLICY_WARN);
+ testLogging(additionalContext, ERROR_MSG_UNMATCHED_ROW, ROW_UNMATCHING);
+ }
+
+ @Test
+ public void testUnmatchedRowIgnoredDeprecated() throws Exception {
+ Context additionalContext = new Context();
+ additionalContext.put(WARN_UNMATCHED_ROWS_PROP, String.valueOf(false));
+ testIgnored(additionalContext, ERROR_MSG_UNMATCHED_ROW, ROW_UNMATCHING);
+ }
+
+ @Test
+ public void testUnmatchedRowIgnored() throws Exception {
+ Context additionalContext = new Context();
+ additionalContext.put(UNMATCHED_ROW_POLICY_PROP, POLICY_IGNORE);
+ testIgnored(additionalContext, ERROR_MSG_UNMATCHED_ROW, ROW_UNMATCHING);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testUnmatchedRowConfigValidation() throws Exception {
+ Context additionalContext = new Context();
+ additionalContext.put(WARN_UNMATCHED_ROWS_PROP, String.valueOf(false));
+ additionalContext.put(UNMATCHED_ROW_POLICY_PROP, POLICY_IGNORE);
+ getProducer(additionalContext);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testUnKnownPolicyConfigValidation() throws Exception {
+ Context additionalContext = new Context();
+ additionalContext.put(UNMATCHED_ROW_POLICY_PROP, "FORCED");
+ getProducer(additionalContext);
+ }
+
+ private void testLogging(
+ Context additionalContext, String expectedError, String eventBody) throws Exception {
+ String appendedText = processEvent(additionalContext, eventBody);
+ assertTrue(appendedText.contains(expectedError));
+ }
+
+ private void testIgnored(
+ Context additionalContext, String expectedError, String eventBody) throws Exception {
+ String appendedText = processEvent(additionalContext, eventBody);
+ assertFalse(appendedText.contains(expectedError));
+ }
+
+ private void testThrowsException(
+ Context additionalContext, String expectedError, String eventBody) throws Exception {
+ thrown.expect(FlumeException.class);
+ thrown.expectMessage(expectedError);
+ processEvent(additionalContext, eventBody);
+ }
+
+ private String processEvent(Context additionalContext, String eventBody) throws Exception {
+ CapturingLogAppender appender = new CapturingLogAppender();
+ RegexpKuduOperationsProducer producer = getProducer(additionalContext);
+ try (Closeable c = appender.attach()) {
+ producer.getOperations(EventBuilder.withBody(eventBody.getBytes(Charset.forName("UTF-8"))));
+ }
+ return appender.getAppendedText();
+ }
+
+
+ private RegexpKuduOperationsProducer getProducer(Context additionalContext) throws Exception {
+ RegexpKuduOperationsProducer producer = new RegexpKuduOperationsProducer();
+ producer.initialize(createNewTable("test"));
+ Context context = new Context();
+ context.put(PATTERN_PROP, TEST_REGEXP);
+ context.put(OPERATION_PROP, TEST_OPERATION);
+ context.putAll(additionalContext.getParameters());
+ producer.configure(context);
+
+ return producer;
+ }
+
+ private KuduTable createNewTable(String tableName) throws Exception {
+ ArrayList<ColumnSchema> columns = new ArrayList<>(10);
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("byteFld", Type.INT8).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("stringFld", Type.STRING).build());
+ CreateTableOptions createOptions = new CreateTableOptions()
+ .addHashPartitions(ImmutableList.of("key"), 3).setNumReplicas(1);
+ KuduTable table =
+ harness.getClient().createTable(tableName, new Schema(columns), createOptions);
+ return table;
+ }
+
+
+}
diff --git a/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/TestSecureKuduSink.java b/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/TestSecureKuduSink.java
new file mode 100644
index 0000000..0b5b4ac
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kudu-sink/src/test/java/org/apache/flume/sink/kudu/TestSecureKuduSink.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.sink.kudu;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.kudu.test.ClientTestUtil.scanTableToStrings;
+import static org.apache.kudu.util.SecurityUtil.KUDU_TICKETCACHE_PROPERTY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.event.EventBuilder;
+import org.apache.kudu.test.KuduTestHarness;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.test.cluster.MiniKuduCluster.MiniKuduClusterBuilder;
+
+public class TestSecureKuduSink {
+ private static final Logger LOG = LoggerFactory.getLogger(TestSecureKuduSink.class);
+ private static final int TICKET_LIFETIME_SECONDS = 20;
+ private static final int RENEWABLE_LIFETIME_SECONDS = 35;
+
+ private static final MiniKuduClusterBuilder clusterBuilder =
+ KuduTestHarness.getBaseClusterBuilder()
+ .kdcTicketLifetime(TICKET_LIFETIME_SECONDS + "s")
+ .kdcRenewLifetime(RENEWABLE_LIFETIME_SECONDS + "s")
+ .enableKerberos();
+
+ @Rule
+ public KuduTestHarness harness = new KuduTestHarness(clusterBuilder);
+
+ @Before
+ public void clearTicketCacheProperty() {
+ // Let Flume authenticate.
+ System.clearProperty(KUDU_TICKETCACHE_PROPERTY);
+ }
+
+ @Test
+ public void testEventsWithShortTickets() throws Exception {
+ Instant start = Instant.now();
+ LOG.info("Creating new table...");
+ ArrayList<ColumnSchema> columns = new ArrayList<>(1);
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("payload", Type.BINARY)
+ .key(true).build());
+ CreateTableOptions createOptions =
+ new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("payload"))
+ .setNumReplicas(1);
+ String tableName = "test_long_lived_events";
+ KuduTable table = harness.getClient().createTable(tableName, new Schema(columns),
+ createOptions);
+ LOG.info("Created new table.");
+
+ KuduSink sink = KuduSinkTestUtil.createSecureSink(
+ tableName, harness.getMasterAddressesAsString(), harness.getClusterRoot());
+ sink.start();
+
+ LOG.info("Testing events at the beginning.");
+ int eventCount = 10;
+
+ processEvents(sink, 0, eventCount / 2);
+
+ LOG.info("Waiting for tickets to expire");
+ Duration elapsedSoFar = Duration.between(Instant.now(), start);
+ TimeUnit.MILLISECONDS.sleep(1000 * (RENEWABLE_LIFETIME_SECONDS + 1) -
+ elapsedSoFar.toMillis());
+ // At this point, the ticket will have been outstanding for at least
+ // (RENEWABLE_LIFETIME_SECONDS + 1) seconds-- so the sink will need to reacquire a ticket.
+
+ LOG.info("Testing events after ticket renewal.");
+ processEvents(sink, eventCount / 2, eventCount);
+
+ List<String> rows = scanTableToStrings(table);
+ assertEquals(eventCount + " row(s) expected", eventCount, rows.size());
+
+ for (int i = 0; i < eventCount; i++) {
+ assertTrue("incorrect payload", rows.get(i).contains("payload body " + i));
+ }
+
+ LOG.info("Testing {} events finished successfully.", eventCount);
+ }
+
+ private void processEvents(KuduSink sink, int from, int to) throws EventDeliveryException {
+ List<Event> events = new ArrayList<>();
+ for (int i = from; i < to; i++) {
+ Event e = EventBuilder.withBody(String.format("payload body %s", i).getBytes(UTF_8));
+ events.add(e);
+ }
+
+ KuduSinkTestUtil.processEvents(sink, events);
+ LOG.info("Events flushed.");
+ }
+}
diff --git a/flume-ng-sinks/flume-ng-kudu-sink/src/test/resources/log4j2.xml b/flume-ng-sinks/flume-ng-kudu-sink/src/test/resources/log4j2.xml
new file mode 100644
index 0000000..45542ba
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kudu-sink/src/test/resources/log4j2.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<Configuration status="OFF">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout pattern="%d (%t) [%p - %l] %m%n" />
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Logger name="org.apache.flume" level="DEBUG"/>
+ <Logger name="org.apache.kudu" level="DEBUG"/>
+ <Root level="INFO">
+ <AppenderRef ref="Console" />
+ </Root>
+ </Loggers>
+</Configuration>
\ No newline at end of file
diff --git a/flume-ng-sinks/flume-ng-kudu-sink/src/test/resources/testAvroKuduOperationsProducer.avsc b/flume-ng-sinks/flume-ng-kudu-sink/src/test/resources/testAvroKuduOperationsProducer.avsc
new file mode 100644
index 0000000..d1f660e
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kudu-sink/src/test/resources/testAvroKuduOperationsProducer.avsc
@@ -0,0 +1,11 @@
+{"namespace": "org.apache.flume.sink.kudu",
+ "type": "record",
+ "name": "AvroKuduOperationsProducerTestRecord",
+ "fields": [
+ {"name": "key", "type": "int"},
+ {"name": "longField", "type": "long"},
+ {"name": "doubleField", "type": "double"},
+ {"name": "nullableField", "type": ["string", "null"]},
+ {"name": "stringField", "type": "string"}
+ ]
+}
\ No newline at end of file
diff --git a/flume-ng-sinks/pom.xml b/flume-ng-sinks/pom.xml
index 99d2126..c2fd7e1 100644
--- a/flume-ng-sinks/pom.xml
+++ b/flume-ng-sinks/pom.xml
@@ -39,6 +39,7 @@ limitations under the License.
<module>flume-ng-elasticsearch-sink</module>
<module>flume-ng-morphline-solr-sink</module>
<module>flume-ng-kafka-sink</module>
+ <module>flume-ng-kudu-sink</module>
<module>flume-http-sink</module>
<module>flume-dataset-sink</module>
<module>flume-hive-sink</module>
diff --git a/pom.xml b/pom.xml
index be7edbc..ce71aa9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -72,6 +72,7 @@ limitations under the License.
<hbase.version>1.0.0</hbase.version>
<hbase2.jetty.version>9.3.19.v20170502</hbase2.jetty.version>
<hbase2.version>2.0.0</hbase2.version>
+ <hive.version>1.0.0</hive.version>
<httpcore.version>4.4.6</httpcore.version>
<httpclient.version>4.5.3</httpclient.version>
<irclib.version>1.10</irclib.version>
@@ -81,9 +82,9 @@ limitations under the License.
<junit.version>4.10</junit.version>
<kafka.version>2.0.1</kafka.version>
<kite.version>1.0.0</kite.version>
- <hive.version>1.0.0</hive.version>
+ <kudu.version>1.10.0</kudu.version>
<lifecycle-mapping.version>1.0.0</lifecycle-mapping.version>
- <log4j.version>2.10.0</log4j.version>
+ <log4j.version>2.11.2</log4j.version>
<mapdb.version>0.9.9</mapdb.version>
<mina.version>2.0.4</mina.version>
<mockito.version>1.9.0</mockito.version>