You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2019/11/13 18:32:20 UTC
[kudu] branch master updated: Remove Flume integration module
This is an automated email from the ASF dual-hosted git repository.
granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new c8d36a9 Remove Flume integration module
c8d36a9 is described below
commit c8d36a9f9424c8bb8ef20e86a576caea818689b0
Author: Grant Henke <gr...@apache.org>
AuthorDate: Wed Sep 18 13:30:02 2019 -0500
Remove Flume integration module
This patch removes the Flume integration now that it has been moved to
the Apache Flume project.
Jira: https://issues.apache.org/jira/browse/FLUME-3345
Commit: https://github.com/apache/flume/commit/9dafe98972a652aa1a202c6df9c08139d2bea592
Change-Id: I56ebc975d99ea49d36c5afc02fa4914b360ee082
Reviewed-on: http://gerrit.cloudera.org:8080/14255
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
java/buildSrc/build.gradle | 1 -
java/gradle/dependencies.gradle | 3 -
java/kudu-flume-sink/build.gradle | 44 ---
.../flume/sink/AvroKuduOperationsProducer.java | 309 ----------------
.../kudu/flume/sink/KuduOperationsProducer.java | 58 ---
.../java/org/apache/kudu/flume/sink/KuduSink.java | 325 -----------------
.../flume/sink/KuduSinkConfigurationConstants.java | 82 -----
.../flume/sink/RegexpKuduOperationsProducer.java | 405 ---------------------
.../sink/SimpleKeyedKuduOperationsProducer.java | 138 -------
.../flume/sink/SimpleKuduOperationsProducer.java | 95 -----
.../flume/sink/AvroKuduOperationsProducerTest.java | 191 ----------
.../sink/KeyedKuduOperationsProducerTest.java | 187 ----------
.../org/apache/kudu/flume/sink/KuduSinkTest.java | 210 -----------
.../apache/kudu/flume/sink/KuduSinkTestUtil.java | 102 ------
...RegexpKuduOperationsProducerParseErrorTest.java | 297 ---------------
.../sink/RegexpKuduOperationsProducerTest.java | 188 ----------
.../apache/kudu/flume/sink/SecureKuduSinkTest.java | 122 -------
.../src/test/resources/log4j2.properties | 32 --
.../resources/testAvroKuduOperationsProducer.avsc | 14 -
java/settings.gradle | 1 -
20 files changed, 2804 deletions(-)
diff --git a/java/buildSrc/build.gradle b/java/buildSrc/build.gradle
index 2cb860c..7560ecd 100644
--- a/java/buildSrc/build.gradle
+++ b/java/buildSrc/build.gradle
@@ -28,7 +28,6 @@ repositories {
// Manage plugin dependencies since the plugin block can't be used in included build scripts yet.
// For more details see: https://docs.gradle.org/current/userguide/plugins.html#plugins_dsl_limitations
dependencies {
- compile "com.commercehub.gradle.plugin:gradle-avro-plugin:0.16.0"
compile "com.github.ben-manes:gradle-versions-plugin:0.21.0"
compile "com.github.jengelman.gradle.plugins:shadow:5.0.0"
compile "gradle.plugin.com.google.gradle:osdetector-gradle-plugin:1.6.2"
diff --git a/java/gradle/dependencies.gradle b/java/gradle/dependencies.gradle
index 221e055..7471cec 100755
--- a/java/gradle/dependencies.gradle
+++ b/java/gradle/dependencies.gradle
@@ -32,7 +32,6 @@ versions += [
commonsIo : "2.6",
errorProne : "2.3.3",
errorProneJavac: "9+181-r4173-1",
- flume : "1.9.0",
gradle : "5.4.1",
guava : "27.1-jre",
hadoop : "3.2.0",
@@ -80,8 +79,6 @@ libs += [
commonsIo : "commons-io:commons-io:$versions.commonsIo",
errorProne : "com.google.errorprone:error_prone_core:$versions.errorProne",
errorProneJavac : "com.google.errorprone:javac:$versions.errorProneJavac",
- flumeConfiguration : "org.apache.flume:flume-ng-configuration:$versions.flume",
- flumeCore : "org.apache.flume:flume-ng-core:$versions.flume",
guava : "com.google.guava:guava:$versions.guava",
hadoopClient : "org.apache.hadoop:hadoop-client:$versions.hadoop",
hadoopCommon : "org.apache.hadoop:hadoop-common:$versions.hadoop",
diff --git a/java/kudu-flume-sink/build.gradle b/java/kudu-flume-sink/build.gradle
deleted file mode 100644
index cd61cd1..0000000
--- a/java/kudu-flume-sink/build.gradle
+++ /dev/null
@@ -1,44 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// Add the Avro plugin to support code generation from schema files.
-apply from: "$rootDir/gradle/shadow.gradle"
-apply plugin: "com.commercehub.gradle.plugin.avro"
-
-dependencies {
- compile project(path: ":kudu-client", configuration: "shadow")
-
- optional libs.yetusAnnotations
-
- provided libs.avro
- provided libs.flumeConfiguration
- provided libs.flumeCore
- provided libs.guava
- provided libs.hadoopCommon
- provided libs.slf4jApi
-
- testCompile project(path: ":kudu-test-utils", configuration: "shadow")
- testCompile libs.junit
- testCompile libs.log4j
- testCompile libs.log4jSlf4jImpl
-}
-
-// Configure the Avro plugin to compile the schemas in the
-// resource directory so the schemas are available in tests.
-generateTestAvroJava {
- source("src/test/resources")
-}
\ No newline at end of file
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducer.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducer.java
deleted file mode 100644
index f9a7ddc..0000000
--- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducer.java
+++ /dev/null
@@ -1,309 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.kudu.flume.sink;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.net.URI;
-import java.net.URL;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-
-import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.util.concurrent.UncheckedExecutionException;
-import org.apache.avro.LogicalType;
-import org.apache.avro.LogicalTypes;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.FlumeException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.kudu.Type;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-
-import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.Operation;
-import org.apache.kudu.client.PartialRow;
-
-/**
- * An Avro serializer that generates one operation per event by deserializing the event
- * body as an Avro record and mapping its fields to columns in a Kudu table.
- *
- * <p><strong>Avro Kudu Operations Producer configuration parameters</strong>
- * <table cellpadding=3 cellspacing=0 border=1 summary="Avro Kudu Operations Producer configuration parameters">
- * <tr><th>Property Name</th>
- * <th>Default</th>
- * <th>Required?</th>
- * <th>Description</th></tr>
- * <tr>
- * <td>producer.operation</td>
- * <td>upsert</td>
- * <td>No</td>
- * <td>The operation used to write events to Kudu.
- * Supported operations are 'insert' and 'upsert'</td>
- * </tr>
- * <tr>
- * <td>producer.schemaPath</td>
- * <td></td>
- * <td>No</td>
- * <td>The location of the Avro schema file used to deserialize the Avro-encoded event bodies.
- * It's used whenever an event does not include its own schema. If not specified, the
- * schema must be specified on a per-event basis, either by url or as a literal.
- * Schemas must be record type.</td>
- * </tr>
- * </table>
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class AvroKuduOperationsProducer implements KuduOperationsProducer {
- public static final String OPERATION_PROP = "operation";
- public static final String SCHEMA_PROP = "schemaPath";
- public static final String DEFAULT_OPERATION = "upsert";
- public static final String SCHEMA_URL_HEADER = "flume.avro.schema.url";
- public static final String SCHEMA_LITERAL_HEADER = "flume.avro.schema.literal";
-
- private String operation;
- private GenericRecord reuse;
- private KuduTable table;
- private String defaultSchemaUrl;
-
- /**
- * The binary decoder to reuse for event parsing.
- */
- private BinaryDecoder decoder = null;
-
- /**
- * A cache of schemas retrieved by URL to avoid re-parsing the schema.
- */
- private static final LoadingCache<String, Schema> schemasFromURL =
- CacheBuilder.newBuilder()
- .build(new CacheLoader<String, Schema>() {
- @Override
- public Schema load(String url) throws IOException {
- Schema.Parser parser = new Schema.Parser();
- InputStream is = null;
- try {
- FileSystem fs = FileSystem.get(URI.create(url), conf);
- if (url.toLowerCase(Locale.ENGLISH).startsWith("hdfs:/")) {
- is = fs.open(new Path(url));
- } else {
- is = new URL(url).openStream();
- }
- return parser.parse(is);
- } finally {
- if (is != null) {
- is.close();
- }
- }
- }
- });
-
- /**
- * A cache of literal schemas to avoid re-parsing the schema.
- */
- private static final LoadingCache<String, Schema> schemasFromLiteral =
- CacheBuilder.newBuilder()
- .build(new CacheLoader<String, Schema>() {
- @Override
- public Schema load(String literal) {
- Preconditions.checkNotNull(literal,
- "Schema literal cannot be null without a Schema URL");
- return new Schema.Parser().parse(literal);
- }
- });
-
- /**
- * A cache of DatumReaders per schema.
- */
- private static final LoadingCache<Schema, DatumReader<GenericRecord>> readers =
- CacheBuilder.newBuilder()
- .build(new CacheLoader<Schema, DatumReader<GenericRecord>>() {
- @Override
- public DatumReader<GenericRecord> load(Schema schema) {
- return new GenericDatumReader<>(schema);
- }
- });
-
- private static final Configuration conf = new Configuration();
-
- public AvroKuduOperationsProducer() {
- }
-
- @Override
- public void configure(Context context) {
- this.operation = context.getString(OPERATION_PROP, DEFAULT_OPERATION);
-
- String schemaPath = context.getString(SCHEMA_PROP);
- if (schemaPath != null) {
- defaultSchemaUrl = schemaPath;
- }
- }
-
- @Override
- public void initialize(KuduTable table) {
- this.table = table;
- }
-
- @Override
- public List<Operation> getOperations(Event event) throws FlumeException {
- Schema schema = getSchema(event);
- DatumReader<GenericRecord> reader = readers.getUnchecked(schema);
- decoder = DecoderFactory.get().binaryDecoder(event.getBody(), decoder);
- try {
- reuse = reader.read(reuse, decoder);
- } catch (IOException e) {
- throw new FlumeException("Cannot deserialize event", e);
- }
- Operation op;
- switch (operation.toLowerCase()) {
- case "upsert":
- op = table.newUpsert();
- break;
- case "insert":
- op = table.newInsert();
- break;
- default:
- throw new FlumeException(String.format("Unexpected operation %s", operation));
- }
- setupOp(op, schema, reuse);
- return Collections.singletonList(op);
- }
-
- private void setupOp(Operation op, Schema schema, GenericRecord record) {
- PartialRow row = op.getRow();
- for (ColumnSchema col : table.getSchema().getColumns()) {
- String name = col.getName();
- Object value = record.get(name);
- if (value == null) {
- if (col.isNullable()) {
- row.setNull(name);
- } else {
- // leave unset for possible Kudu default
- }
- } else {
- // Avro doesn't support 8- or 16-bit integer types, but we'll allow them to be passed as
- // a larger type.
- try {
- switch (col.getType()) {
- case BOOL:
- row.addBoolean(name, (boolean) value);
- break;
- case INT8:
- row.addByte(name, (byte) value);
- break;
- case INT16:
- row.addShort(name, (short) value);
- break;
- case INT32:
- row.addInt(name, (int) value);
- break;
- case INT64: // Fall through
- case UNIXTIME_MICROS:
- row.addLong(name, (long) value);
- break;
- case FLOAT:
- row.addFloat(name, (float) value);
- break;
- case DOUBLE:
- row.addDouble(name, (double) value);
- break;
- case DECIMAL:
- row.addDecimal(name, getAvroBigDecimal(schema, name, value));
- break;
- case STRING:
- row.addString(name, value.toString());
- break;
- case BINARY:
- row.addBinary(name, (byte[]) value);
- break;
- default:
- throw new FlumeException(String.format(
- "Unrecognized type %s for column %s", col.getType().toString(), name));
- }
- } catch (ClassCastException e) {
- throw new FlumeException(
- String.format("Failed to coerce value for column '%s' to type %s",
- col.getName(),
- col.getType()), e);
- }
- }
- }
- }
-
- private BigDecimal getAvroBigDecimal(Schema schema, String name, Object value) {
- LogicalType logicalType = schema.getField(name).schema().getLogicalType();
- if (!(logicalType instanceof LogicalTypes.Decimal)) {
- throw new FlumeException(String.format(
- "Failed to coerce value for column '%s' to type %s",
- name,
- Type.DECIMAL));
- }
- int scale = ((LogicalTypes.Decimal) logicalType).getScale();
- BigInteger unscaledValue = new BigInteger(((ByteBuffer) value).array());
- return new BigDecimal(unscaledValue, scale);
- }
-
- private Schema getSchema(Event event) throws FlumeException {
- Map<String, String> headers = event.getHeaders();
- String schemaUrl = headers.get(SCHEMA_URL_HEADER);
- String schemaLiteral = headers.get(SCHEMA_LITERAL_HEADER);
- try {
- if (schemaUrl != null) {
- return schemasFromURL.get(schemaUrl);
- } else if (schemaLiteral != null) {
- return schemasFromLiteral.get(schemaLiteral);
- } else if (defaultSchemaUrl != null) {
- return schemasFromURL.get(defaultSchemaUrl);
- } else {
- throw new FlumeException(
- String.format("No schema for event. " +
- "Specify configuration property '%s' or event header '%s'",
- SCHEMA_PROP,
- SCHEMA_URL_HEADER));
- }
- } catch (ExecutionException e) {
- throw new FlumeException("Cannot get schema", e);
- } catch (UncheckedExecutionException e) {
- throw new FlumeException("Cannot parse schema", e);
- }
- }
-
- @Override
- public void close() {
- }
-}
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduOperationsProducer.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduOperationsProducer.java
deleted file mode 100644
index d886b84..0000000
--- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduOperationsProducer.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.kudu.flume.sink;
-
-import java.util.List;
-
-import org.apache.flume.Event;
-import org.apache.flume.conf.Configurable;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.Operation;
-
-/**
- * Interface for an operations producer that produces Kudu Operations from
- * Flume events.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public interface KuduOperationsProducer extends Configurable, AutoCloseable {
- /**
- * Initializes the operations producer. Called between configure and
- * getOperations.
- * @param table the KuduTable used to create Kudu Operation objects
- */
- void initialize(KuduTable table);
-
- /**
- * Returns the operations that should be written to Kudu as a result of this event.
- * @param event Event to convert to one or more Operations
- * @return List of Operations that should be written to Kudu
- */
- List<Operation> getOperations(Event event);
-
- /**
- * Cleans up any state. Called when the sink is stopped.
- */
- @Override
- void close();
-}
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java
deleted file mode 100644
index 969c712..0000000
--- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSink.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.kudu.flume.sink;
-
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.BATCH_SIZE;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.IGNORE_DUPLICATE_ROWS;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.KERBEROS_KEYTAB;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.KERBEROS_PRINCIPAL;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.MASTER_ADDRESSES;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PROXY_USER;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.TABLE_NAME;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.TIMEOUT_MILLIS;
-
-import java.security.PrivilegedAction;
-import java.util.List;
-
-import com.google.common.base.Preconditions;
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.FlumeException;
-import org.apache.flume.Transaction;
-import org.apache.flume.auth.FlumeAuthenticationUtil;
-import org.apache.flume.auth.PrivilegedExecutor;
-import org.apache.flume.conf.Configurable;
-import org.apache.flume.instrumentation.SinkCounter;
-import org.apache.flume.sink.AbstractSink;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.kudu.client.AsyncKuduClient;
-import org.apache.kudu.client.KuduClient;
-import org.apache.kudu.client.KuduSession;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.Operation;
-import org.apache.kudu.client.OperationResponse;
-import org.apache.kudu.client.SessionConfiguration;
-
-/**
- * A Flume sink that reads events from a channel and writes them to a Kudu table.
- *
- * <p><strong>Flume Kudu Sink configuration parameters</strong>
- *
- * <table cellpadding=3 cellspacing=0 border=1 summary="Flume Kudu Sink configuration parameters">
- * <tr><th>Property Name</th><th>Default</th><th>Required?</th><th>Description</th></tr>
- * <tr><td>channel</td><td></td><td>Yes</td><td>The name of the Flume channel to read.</td></tr>
- * <tr><td>type</td><td></td><td>Yes</td>
- * <td>Component name. Must be {@code org.apache.kudu.flume.sink.KuduSink}</td></tr>
- * <tr><td>masterAddresses</td><td></td><td>Yes</td>
- * <td>Comma-separated list of "host:port" Kudu master addresses.
- * The port is optional.</td></tr>
- * <tr><td>tableName</td><td></td><td>Yes</td>
- * <td>The name of the Kudu table to write to.</td></tr>
- * <tr><td>batchSize</td><td>1000</td><td>No</td>
- * <td>The maximum number of events the sink takes from the channel per transaction.</td></tr>
- * <tr><td>ignoreDuplicateRows</td><td>true</td>
- * <td>No</td><td>Whether to ignore duplicate primary key errors caused by inserts.</td></tr>
- * <tr><td>timeoutMillis</td><td>10000</td><td>No</td>
- * <td>Timeout period for Kudu write operations, in milliseconds.</td></tr>
- * <tr><td>producer</td><td>{@link SimpleKuduOperationsProducer}</td><td>No</td>
- * <td>The fully-qualified class name of the {@link KuduOperationsProducer}
- * the sink should use.</td></tr>
- * <tr><td>producer.*</td><td></td><td>(Varies by operations producer)</td>
- * <td>Configuration properties to pass to the operations producer implementation.</td></tr>
- * </table>
- *
- * <p><strong>Installation</strong>
- *
- * <p>After building the sink, in order to use it with Flume, place the file named
- * <tt>kudu-flume-sink-<em>VERSION</em>-jar-with-dependencies.jar</tt> in the
- * Flume <tt>plugins.d</tt> directory under <tt>kudu-flume-sink/lib/</tt>.
- *
- * <p>For detailed instructions on using Flume's plugins.d mechanism, please see the plugins.d
- * section of the <a href="https://flume.apache.org/FlumeUserGuide.html#the-plugins-d-directory">Flume User Guide</a>.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class KuduSink extends AbstractSink implements Configurable {
- private static final Logger logger = LoggerFactory.getLogger(KuduSink.class);
- private static final int DEFAULT_BATCH_SIZE = 1000;
- private static final Long DEFAULT_TIMEOUT_MILLIS =
- AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS;
- private static final String DEFAULT_KUDU_OPERATION_PRODUCER =
- "org.apache.kudu.flume.sink.SimpleKuduOperationsProducer";
- private static final boolean DEFAULT_IGNORE_DUPLICATE_ROWS = true;
-
- private String masterAddresses;
- private String tableName;
- private int batchSize;
- private long timeoutMillis;
- private boolean ignoreDuplicateRows;
- private KuduTable table;
- private KuduSession session;
- private KuduClient client;
- private KuduOperationsProducer operationsProducer;
- private SinkCounter sinkCounter;
- private PrivilegedExecutor privilegedExecutor;
-
- public KuduSink() {
- this(null);
- }
-
- @InterfaceAudience.LimitedPrivate("Test")
- @InterfaceAudience.Private
- public KuduSink(KuduClient kuduClient) {
- this.client = kuduClient;
- }
-
- @Override
- public synchronized void start() {
- Preconditions.checkState(table == null && session == null,
- "Please call stop before calling start on an old instance.");
-
- // Client is not null only inside tests.
- if (client == null) {
- // Creating client with FlumeAuthenticator.
- client = privilegedExecutor.execute(
- new PrivilegedAction<KuduClient>() {
- @Override
- public KuduClient run() {
- return new KuduClient.KuduClientBuilder(masterAddresses).build();
- }
- }
- );
- }
- session = client.newSession();
- session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
- session.setTimeoutMillis(timeoutMillis);
- session.setIgnoreAllDuplicateRows(ignoreDuplicateRows);
- session.setMutationBufferSpace(batchSize);
-
- try {
- table = client.openTable(tableName);
- } catch (Exception ex) {
- sinkCounter.incrementConnectionFailedCount();
- String msg = String.format("Could not open Kudu table '%s'", tableName);
- logger.error(msg, ex);
- throw new FlumeException(msg, ex);
- }
- operationsProducer.initialize(table);
-
- super.start();
- sinkCounter.incrementConnectionCreatedCount();
- sinkCounter.start();
- }
-
- @Override
- public synchronized void stop() {
- Exception ex = null;
- try {
- operationsProducer.close();
- } catch (Exception e) {
- ex = e;
- logger.error("Error closing operations producer", e);
- }
- try {
- if (client != null) {
- client.shutdown();
- }
- } catch (Exception e) {
- ex = e;
- logger.error("Error closing client", e);
- }
- client = null;
- table = null;
- session = null;
-
- sinkCounter.incrementConnectionClosedCount();
- sinkCounter.stop();
- if (ex != null) {
- throw new FlumeException("Error stopping sink", ex);
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void configure(Context context) {
- masterAddresses = context.getString(MASTER_ADDRESSES);
- Preconditions.checkNotNull(masterAddresses,
- "Missing master addresses. Please specify property '%s'.",
- MASTER_ADDRESSES);
-
- tableName = context.getString(TABLE_NAME);
- Preconditions.checkNotNull(tableName,
- "Missing table name. Please specify property '%s'",
- TABLE_NAME);
-
- batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE);
- timeoutMillis = context.getLong(TIMEOUT_MILLIS, DEFAULT_TIMEOUT_MILLIS);
- ignoreDuplicateRows = context.getBoolean(IGNORE_DUPLICATE_ROWS, DEFAULT_IGNORE_DUPLICATE_ROWS);
- String operationProducerType = context.getString(PRODUCER);
- String kerberosPrincipal = context.getString(KERBEROS_PRINCIPAL);
- String kerberosKeytab = context.getString(KERBEROS_KEYTAB);
- String proxyUser = context.getString(PROXY_USER);
-
- privilegedExecutor = FlumeAuthenticationUtil.getAuthenticator(
- kerberosPrincipal, kerberosKeytab).proxyAs(proxyUser);
-
- // Check for operations producer, if null set default operations producer type.
- if (operationProducerType == null || operationProducerType.isEmpty()) {
- operationProducerType = DEFAULT_KUDU_OPERATION_PRODUCER;
- logger.warn("No Kudu operations producer provided, using default");
- }
-
- Context producerContext = new Context();
- producerContext.putAll(context.getSubProperties(
- KuduSinkConfigurationConstants.PRODUCER_PREFIX));
-
- try {
- Class<? extends KuduOperationsProducer> clazz =
- (Class<? extends KuduOperationsProducer>)
- Class.forName(operationProducerType);
- operationsProducer = clazz.getDeclaredConstructor().newInstance();
- operationsProducer.configure(producerContext);
- } catch (Exception e) {
- logger.error("Could not instantiate Kudu operations producer" , e);
- throw new RuntimeException(e);
- }
- sinkCounter = new SinkCounter(this.getName());
- }
-
- public KuduClient getClient() {
- return client;
- }
-
- @Override
- public Status process() throws EventDeliveryException {
- if (session.hasPendingOperations()) {
- // If for whatever reason we have pending operations, refuse to process
- // more and tell the caller to try again a bit later. We don't want to
- // pile on the KuduSession.
- return Status.BACKOFF;
- }
-
- Channel channel = getChannel();
- Transaction txn = channel.getTransaction();
-
- txn.begin();
-
- try {
- long txnEventCount = 0;
- for (; txnEventCount < batchSize; txnEventCount++) {
- Event event = channel.take();
- if (event == null) {
- break;
- }
-
- List<Operation> operations = operationsProducer.getOperations(event);
- for (Operation o : operations) {
- session.apply(o);
- }
- }
-
- logger.debug("Flushing {} events", txnEventCount);
- List<OperationResponse> responses = session.flush();
- if (responses != null) {
- for (OperationResponse response : responses) {
- // Throw an EventDeliveryException if at least one of the responses was
- // a row error. Row errors can occur for example when an event is inserted
- // into Kudu successfully but the Flume transaction is rolled back for some reason,
- // and a subsequent replay of the same Flume transaction leads to a
- // duplicate key error since the row already exists in Kudu.
- // Note: Duplicate keys will not be reported as errors if ignoreDuplicateRows
- // is enabled in the config.
- if (response.hasRowError()) {
- throw new EventDeliveryException("Failed to flush one or more changes. " +
- "Transaction rolled back: " + response.getRowError().toString());
- }
- }
- }
-
- if (txnEventCount == 0) {
- sinkCounter.incrementBatchEmptyCount();
- } else if (txnEventCount == batchSize) {
- sinkCounter.incrementBatchCompleteCount();
- } else {
- sinkCounter.incrementBatchUnderflowCount();
- }
-
- txn.commit();
-
- if (txnEventCount == 0) {
- return Status.BACKOFF;
- }
-
- sinkCounter.addToEventDrainSuccessCount(txnEventCount);
- return Status.READY;
-
- } catch (Throwable e) {
- txn.rollback();
-
- String msg = "Failed to commit transaction. Transaction rolled back.";
- logger.error(msg, e);
- if (e instanceof Error || e instanceof RuntimeException) {
- throw new RuntimeException(e);
- } else {
- logger.error(msg, e);
- throw new EventDeliveryException(msg, e);
- }
- } finally {
- txn.close();
- }
- }
-}
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSinkConfigurationConstants.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSinkConfigurationConstants.java
deleted file mode 100644
index dbb2f66..0000000
--- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/KuduSinkConfigurationConstants.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.kudu.flume.sink;
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class KuduSinkConfigurationConstants {
- /**
- * Comma-separated list of "host:port" Kudu master addresses.
- * The port is optional and defaults to the Kudu Java client's default master
- * port.
- */
- public static final String MASTER_ADDRESSES = "masterAddresses";
-
- /**
- * The name of the table in Kudu to write to.
- */
- public static final String TABLE_NAME = "tableName";
-
- /**
- * The fully qualified class name of the KuduOperationsProducer class that the
- * sink should use.
- */
- public static final String PRODUCER = "producer";
-
- /**
- * Prefix for configuration parameters that are passed to the
- * KuduOperationsProducer.
- */
- public static final String PRODUCER_PREFIX = PRODUCER + ".";
-
- /**
- * Maximum number of events that the sink should take from the channel per
- * transaction.
- */
- public static final String BATCH_SIZE = "batchSize";
-
- /**
- * Timeout period for Kudu operations, in milliseconds.
- */
- public static final String TIMEOUT_MILLIS = "timeoutMillis";
-
- /**
- * Whether to ignore duplicate primary key errors caused by inserts.
- */
- public static final String IGNORE_DUPLICATE_ROWS = "ignoreDuplicateRows";
-
- /**
- * Path to the keytab file used for authentication
- */
- public static final String KERBEROS_KEYTAB = "kerberosKeytab";
-
- /**
- * Kerberos principal used for authentication
- */
- public static final String KERBEROS_PRINCIPAL = "kerberosPrincipal";
-
- /**
- * The effective user if different from the kerberos principal
- */
- public static final String PROXY_USER = "proxyUser";
-}
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducer.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducer.java
deleted file mode 100644
index a3b2c57..0000000
--- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducer.java
+++ /dev/null
@@ -1,405 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.kudu.flume.sink;
-
-import java.math.BigDecimal;
-import java.nio.charset.Charset;
-import java.util.Arrays;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.regex.PatternSyntaxException;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.FlumeException;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
-import org.apache.kudu.client.Insert;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.Operation;
-import org.apache.kudu.client.PartialRow;
-import org.apache.kudu.client.Upsert;
-
-/**
- * A regular expression operations producer that generates one or more Kudu
- * {@link Insert} or {@link Upsert} operations per Flume {@link Event} by
- * parsing the event {@code body} using a regular expression. Values are
- * coerced to the types of the named columns in the Kudu table.
- *
- * <p>Example: If the Kudu table has the schema:
- *
- * <pre>
- * key INT32
- * name STRING</pre>
- *
- * <p>and {@code producer.pattern = (?<key>\\d+),(?<name>\\w+)} then
- * {@code RegexpKuduOperationsProducer} will parse the string:
- *
- * <pre>|12345,Mike||54321,Todd|</pre>
- *
- * into the rows: {@code (key=12345, name=Mike)} and {@code (key=54321, name=Todd)}.
- *
- * <p>Note: This class relies on JDK7 named capturing groups, which are
- * documented in {@link Pattern}. The name of each capturing group must
- * correspond to a column name in the destination Kudu table.
- *
- * <p><strong><code>RegexpKuduOperationsProducer</code> Flume Configuration Parameters</strong></p>
- *
- * <table cellpadding=3 cellspacing=0 border=1 summary="Flume Configuration Parameters">
- * <tr>
- * <th>Property Name</th>
- * <th>Default</th>
- * <th>Required?</th>
- * <th>Description</th>
- * </tr>
- * <tr>
- * <td>producer.pattern</td>
- * <td></td>
- * <td>Yes</td>
- * <td>The regular expression used to parse the event body.</td>
- * </tr>
- * <tr>
- * <td>producer.charset</td>
- * <td>utf-8</td>
- * <td>No</td>
- * <td>The character set of the event body.</td>
- * </tr>
- * <tr>
- * <td>producer.operation</td>
- * <td>upsert</td>
- * <td>No</td>
- * <td>Operation type used to write the event to Kudu. Must be either
- * {@code insert} or {@code upsert}.</td>
- * </tr>
- * <tr>
- * <td>producer.skipMissingColumn</td>
- * <td>false</td>
- * <td>No</td>
- * <td>
- * <b>@deprecated</b><br/> use {@code producer.missingColumnPolicy}
- * What to do if a column in the Kudu table has no corresponding capture group.
- * If set to {@code true}, a warning message is logged and the operation is still attempted.
- * If set to {@code false}, an exception is thrown and the sink will not process the
- * {@code Event}, causing a Flume {@code Channel} rollback.
- * </tr>
- * <tr>
- * <td>producer.skipBadColumnValue</td>
- * <td>false</td>
- * <td>No</td>
- * <td>
- * <b>@deprecated</b><br/> use {@code producer.badColumnValuePolicy}
- * What to do if a value in the pattern match cannot be coerced to the required type.
- * If set to {@code true}, a warning message is logged and the operation is still attempted.
- * If set to {@code false}, an exception is thrown and the sink will not process the
- * {@code Event}, causing a Flume {@code Channel} rollback.
- * </tr>
- * <tr>
- * <td>producer.warnUnmatchedRows</td>
- * <td>true</td>
- * <td>No</td>
- * <td>
- * <b>@deprecated</b><br/> use {@code producer.unmatchedRowPolicy}
- * Whether to log a warning about payloads that do not match the pattern. If set to
- * {@code false}, event bodies with no matches will be silently dropped.</td>
- * </tr>
- * <tr>
- * <td>producer.missingColumnPolicy</td>
- * <td>REJECT</td>
- * <td>No</td>
- * <td>What to do if a column in the Kudu table has no corresponding capture group.<br/>
- * If set to {@code REJECT}, an exception is thrown and the sink will not process the
- * {@code Event}, causing a Flume {@code Channel} rollback.<br/>
- * If set to {@code WARN}, a warning message is logged and the operation is still produced.<br/>
- * If set to {@code IGNORE}, the operation is still produced without any log message.
- * </tr>
- * <tr>
- * <td>producer.badColumnValuePolicy</td>
- * <td>REJECT</td>
- * <td>No</td>
- * <td>What to do if a value in the pattern match cannot be coerced to the required type.<br/>
- * If set to {@code REJECT}, an exception is thrown and the sink will not process the
- * {@code Event}, causing a Flume {@code Channel} rollback.<br/>
- * If set to {@code WARN}, a warning message is logged and the operation is still produced,
- * but does not include the given column.<br/>
- * If set to {@code IGNORE}, the operation is still produced, but does not include the given
- * column and does not log any message.
- * </tr>
- * <tr>
- * <td>producer.unmatchedRowPolicy</td>
- * <td>WARN</td>
- * <td>No</td>
- * <td>What to do if a payload does not match the pattern.<br/>
- * If set to {@code REJECT}, an exception is thrown and the sink will not process the
- * {@code Event}, causing a Flume {@code Channel} rollback.<br/>
- * If set to {@code WARN}, a warning message is logged and the row is skipped,
- * not producing an operation.<br/>
- * If set to {@code IGNORE}, the row is skipped without any log message.
- * </tr>
- * </table>
- *
- * @see Pattern
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class RegexpKuduOperationsProducer implements KuduOperationsProducer {
- private static final Logger logger = LoggerFactory.getLogger(RegexpKuduOperationsProducer.class);
- private static final String INSERT = "insert";
- private static final String UPSERT = "upsert";
- private static final List<String> validOperations = Lists.newArrayList(UPSERT, INSERT);
-
- public static final String PATTERN_PROP = "pattern";
- public static final String ENCODING_PROP = "encoding";
- public static final String DEFAULT_ENCODING = "utf-8";
- public static final String OPERATION_PROP = "operation";
- public static final String DEFAULT_OPERATION = UPSERT;
- @Deprecated
- public static final String SKIP_MISSING_COLUMN_PROP = "skipMissingColumn";
- @Deprecated
- public static final boolean DEFAULT_SKIP_MISSING_COLUMN = false;
- @Deprecated
- public static final String SKIP_BAD_COLUMN_VALUE_PROP = "skipBadColumnValue";
- @Deprecated
- public static final boolean DEFAULT_SKIP_BAD_COLUMN_VALUE = false;
- @Deprecated
- public static final String WARN_UNMATCHED_ROWS_PROP = "skipUnmatchedRows";
- @Deprecated
- public static final boolean DEFAULT_WARN_UNMATCHED_ROWS = true;
- public static final String MISSING_COLUMN_POLICY_PROP = "missingColumnPolicy";
- public static final ParseErrorPolicy DEFAULT_MISSING_COLUMN_POLICY = ParseErrorPolicy.REJECT;
- public static final String BAD_COLUMN_VALUE_POLICY_PROP = "badColumnValuePolicy";
- public static final ParseErrorPolicy DEFAULT_BAD_COLUMN_VALUE_POLICY = ParseErrorPolicy.REJECT;
- public static final String UNMATCHED_ROW_POLICY_PROP = "unmatchedRowPolicy";
- public static final ParseErrorPolicy DEFAULT_UNMATCHED_ROW_POLICY = ParseErrorPolicy.WARN;
-
- private KuduTable table;
- private Pattern pattern;
- private Charset charset;
- private String operation;
- private ParseErrorPolicy missingColumnPolicy;
- private ParseErrorPolicy badColumnValuePolicy;
- private ParseErrorPolicy unmatchedRowPolicy;
-
- public RegexpKuduOperationsProducer() {
- }
-
- @Override
- public void configure(Context context) {
- String regexp = context.getString(PATTERN_PROP);
- Preconditions.checkArgument(regexp != null,
- "Required parameter %s is not specified",
- PATTERN_PROP);
- try {
- pattern = Pattern.compile(regexp);
- } catch (PatternSyntaxException e) {
- throw new IllegalArgumentException(
- String.format("The pattern '%s' is invalid", regexp), e);
- }
- String charsetName = context.getString(ENCODING_PROP, DEFAULT_ENCODING);
- try {
- charset = Charset.forName(charsetName);
- } catch (IllegalArgumentException e) {
- throw new FlumeException(
- String.format("Invalid or unsupported charset %s", charsetName), e);
- }
- operation = context.getString(OPERATION_PROP, DEFAULT_OPERATION).toLowerCase();
- Preconditions.checkArgument(
- validOperations.contains(operation),
- "Unrecognized operation '%s'",
- operation);
-
-
- missingColumnPolicy = getParseErrorPolicyCheckingDeprecatedProperty(
- context, SKIP_MISSING_COLUMN_PROP, MISSING_COLUMN_POLICY_PROP,
- ParseErrorPolicy.WARN, ParseErrorPolicy.REJECT, DEFAULT_MISSING_COLUMN_POLICY
- );
-
- badColumnValuePolicy = getParseErrorPolicyCheckingDeprecatedProperty(
- context, SKIP_BAD_COLUMN_VALUE_PROP, BAD_COLUMN_VALUE_POLICY_PROP,
- ParseErrorPolicy.WARN, ParseErrorPolicy.REJECT, DEFAULT_BAD_COLUMN_VALUE_POLICY
- );
-
- unmatchedRowPolicy = getParseErrorPolicyCheckingDeprecatedProperty(
- context, WARN_UNMATCHED_ROWS_PROP, UNMATCHED_ROW_POLICY_PROP,
- ParseErrorPolicy.WARN, ParseErrorPolicy.IGNORE, DEFAULT_UNMATCHED_ROW_POLICY
- );
- }
-
- @Override
- public void initialize(KuduTable table) {
- this.table = table;
- }
-
- @Override
- public List<Operation> getOperations(Event event) throws FlumeException {
- String raw = new String(event.getBody(), charset);
- Matcher m = pattern.matcher(raw);
- boolean match = false;
- Schema schema = table.getSchema();
- List<Operation> ops = Lists.newArrayList();
- while (m.find()) {
- match = true;
- Operation op;
- switch (operation) {
- case UPSERT:
- op = table.newUpsert();
- break;
- case INSERT:
- op = table.newInsert();
- break;
- default:
- throw new FlumeException(
- String.format("Unrecognized operation type '%s' in getOperations(): " +
- "this should never happen!", operation));
- }
- PartialRow row = op.getRow();
- for (ColumnSchema col : schema.getColumns()) {
- try {
- coerceAndSet(m.group(col.getName()), col.getName(), col.getType(), row);
- } catch (NumberFormatException e) {
- String msg = String.format(
- "Raw value '%s' couldn't be parsed to type %s for column '%s'",
- raw, col.getType(), col.getName());
- logOrThrow(badColumnValuePolicy, msg, e);
- } catch (IllegalArgumentException e) {
- String msg = String.format(
- "Column '%s' has no matching group in '%s'",
- col.getName(), raw);
- logOrThrow(missingColumnPolicy, msg, e);
- } catch (Exception e) {
- throw new FlumeException("Failed to create Kudu operation", e);
- }
- }
- ops.add(op);
- }
- if (!match) {
- String msg = String.format("Failed to match the pattern '%s' in '%s'", pattern, raw);
- logOrThrow(unmatchedRowPolicy, msg, null);
- }
- return ops;
- }
-
- /**
- * Coerces the string `rawVal` to the type `type` and sets the resulting
- * value for column `colName` in `row`.
- *
- * @param rawVal the raw string column value
- * @param colName the name of the column
- * @param type the Kudu type to convert `rawVal` to
- * @param row the row to set the value in
- * @throws NumberFormatException if `rawVal` cannot be cast as `type`.
- */
- private void coerceAndSet(String rawVal, String colName, Type type, PartialRow row)
- throws NumberFormatException {
- switch (type) {
- case BOOL:
- row.addBoolean(colName, Boolean.parseBoolean(rawVal));
- break;
- case INT8:
- row.addByte(colName, Byte.parseByte(rawVal));
- break;
- case INT16:
- row.addShort(colName, Short.parseShort(rawVal));
- break;
- case INT32:
- row.addInt(colName, Integer.parseInt(rawVal));
- break;
- case INT64: // Fall through
- case UNIXTIME_MICROS:
- row.addLong(colName, Long.parseLong(rawVal));
- break;
- case FLOAT:
- row.addFloat(colName, Float.parseFloat(rawVal));
- break;
- case DOUBLE:
- row.addDouble(colName, Double.parseDouble(rawVal));
- break;
- case DECIMAL:
- row.addDecimal(colName, new BigDecimal(rawVal));
- break;
- case BINARY:
- row.addBinary(colName, rawVal.getBytes(charset));
- break;
- case STRING:
- row.addString(colName, rawVal);
- break;
- default:
- logger.warn("got unknown type {} for column '{}'-- ignoring this column",
- type, colName);
- }
- }
-
- private void logOrThrow(ParseErrorPolicy policy, String msg, Exception e)
- throws FlumeException {
- switch (policy) {
- case REJECT:
- throw new FlumeException(msg, e);
- case WARN:
- logger.warn(msg, e);
- break;
- case IGNORE:
- // Fall through
- default:
- }
- }
-
- @Override
- public void close() {
- }
-
- private ParseErrorPolicy getParseErrorPolicyCheckingDeprecatedProperty(
- Context context, String deprecatedPropertyName, String newPropertyName,
- ParseErrorPolicy trueValue, ParseErrorPolicy falseValue, ParseErrorPolicy defaultValue) {
- ParseErrorPolicy policy;
- if (context.containsKey(deprecatedPropertyName)) {
- logger.info("Configuration property {} is deprecated. Use {} instead.",
- deprecatedPropertyName, newPropertyName);
- Preconditions.checkArgument(!context.containsKey(newPropertyName),
- "Both {} and {} specified. Use only one of them, preferably {}.",
- deprecatedPropertyName, newPropertyName, newPropertyName);
- policy = context.getBoolean(deprecatedPropertyName) ? trueValue : falseValue;
- } else {
- String policyString = context.getString(newPropertyName, defaultValue.name());
- try {
- policy = ParseErrorPolicy.valueOf(policyString.toUpperCase());
- } catch (IllegalArgumentException e) {
- throw new IllegalArgumentException(
- "Unknown policy '" + policyString + "'. Use one of the following: " +
- Arrays.toString(ParseErrorPolicy.values()), e);
- }
- }
-
- return policy;
- }
-
- private enum ParseErrorPolicy {
- WARN,
- IGNORE,
- REJECT
- }
-}
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKeyedKuduOperationsProducer.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKeyedKuduOperationsProducer.java
deleted file mode 100644
index b421771..0000000
--- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKeyedKuduOperationsProducer.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.kudu.flume.sink;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.FlumeException;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-
-import org.apache.kudu.client.Insert;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.Operation;
-import org.apache.kudu.client.PartialRow;
-import org.apache.kudu.client.Upsert;
-
-/**
- * A simple serializer that generates one {@link Insert} or {@link Upsert}
- * per {@link Event} by writing the event body into a BINARY column. The pair
- * (key column name, key column value) should be a header in the {@link Event};
- * the column name is configurable but the column type must be STRING. Multiple
- * key columns are not supported.
- *
- * <p><strong>Simple Keyed Kudu Operations Producer configuration parameters</strong>
- *
- * <table cellpadding=3 cellspacing=0 border=1 summary="Simple Keyed Kudu Operations Producer configuration parameters">
- * <tr>
- * <th>Property Name</th>
- * <th>Default</th>
- * <th>Required?</th>
- * <th>Description</th>
- * </tr>
- * <tr>
- * <td>producer.payloadColumn</td>
- * <td>payload</td>
- * <td>No</td>
- * <td>The name of the BINARY column to write the Flume event body to.</td>
- * </tr>
- * <tr>
- * <td>producer.keyColumn</td>
- * <td>key</td>
- * <td>No</td>
- * <td>The name of the STRING key column of the target Kudu table.</td>
- * </tr>
- * <tr>
- * <td>producer.operation</td>
- * <td>upsert</td>
- * <td>No</td>
- * <td>The operation used to write events to Kudu. Supported operations
- * are 'insert' and 'upsert'</td>
- * </tr>
- * </table>
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class SimpleKeyedKuduOperationsProducer implements KuduOperationsProducer {
- public static final String PAYLOAD_COLUMN_PROP = "payloadColumn";
- public static final String PAYLOAD_COLUMN_DEFAULT = "payload";
- public static final String KEY_COLUMN_PROP = "keyColumn";
- public static final String KEY_COLUMN_DEFAULT = "key";
- public static final String OPERATION_PROP = "operation";
- public static final String OPERATION_DEFAULT = "upsert";
-
- private KuduTable table;
- private String payloadColumn;
- private String keyColumn;
- private String operation;
-
- public SimpleKeyedKuduOperationsProducer(){
- }
-
- @Override
- public void configure(Context context) {
- payloadColumn = context.getString(PAYLOAD_COLUMN_PROP, PAYLOAD_COLUMN_DEFAULT);
- keyColumn = context.getString(KEY_COLUMN_PROP, KEY_COLUMN_DEFAULT);
- operation = context.getString(OPERATION_PROP, OPERATION_DEFAULT);
- }
-
- @Override
- public void initialize(KuduTable table) {
- this.table = table;
- }
-
- @Override
- public List<Operation> getOperations(Event event) throws FlumeException {
- String key = event.getHeaders().get(keyColumn);
- if (key == null) {
- throw new FlumeException(
- String.format("No value provided for key column %s", keyColumn));
- }
- try {
- Operation op;
- switch (operation.toLowerCase()) {
- case "upsert":
- op = table.newUpsert();
- break;
- case "insert":
- op = table.newInsert();
- break;
- default:
- throw new FlumeException(
- String.format("Unexpected operation %s", operation));
- }
- PartialRow row = op.getRow();
- row.addString(keyColumn, key);
- row.addBinary(payloadColumn, event.getBody());
-
- return Collections.singletonList(op);
- } catch (Exception e) {
- throw new FlumeException("Failed to create Kudu Operation object", e);
- }
- }
-
- @Override
- public void close() {
- }
-}
-
diff --git a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKuduOperationsProducer.java b/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKuduOperationsProducer.java
deleted file mode 100644
index acd74e1..0000000
--- a/java/kudu-flume-sink/src/main/java/org/apache/kudu/flume/sink/SimpleKuduOperationsProducer.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.kudu.flume.sink;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.FlumeException;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-
-import org.apache.kudu.client.Insert;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.Operation;
-import org.apache.kudu.client.PartialRow;
-
-/**
- * A simple serializer that generates one {@link Insert} per {@link Event}
- * by writing the event body into a BINARY column. The headers are discarded.
- *
- * <p><strong>Simple Kudu Event Producer configuration parameters</strong>
- *
- * <table cellpadding=3 cellspacing=0 border=1 summary="Simple Kudu Event Producer configuration parameters">
- * <tr>
- * <th>Property Name</th>
- * <th>Default</th>
- * <th>Required?</th>
- * <th>Description</th>
- * </tr>
- * <tr>
- * <td>producer.payloadColumn</td>
- * <td>payload</td>
- * <td>No</td>
- * <td>The name of the BINARY column to write the Flume the event body to.</td>
- * </tr>
- * </table>
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class SimpleKuduOperationsProducer implements KuduOperationsProducer {
- public static final String PAYLOAD_COLUMN_PROP = "payloadColumn";
- public static final String PAYLOAD_COLUMN_DEFAULT = "payload";
-
- private KuduTable table;
- private String payloadColumn;
-
- public SimpleKuduOperationsProducer() {
- }
-
- @Override
- public void configure(Context context) {
- payloadColumn = context.getString(PAYLOAD_COLUMN_PROP, PAYLOAD_COLUMN_DEFAULT);
- }
-
- @Override
- public void initialize(KuduTable table) {
- this.table = table;
- }
-
- @Override
- public List<Operation> getOperations(Event event) throws FlumeException {
- try {
- Insert insert = table.newInsert();
- PartialRow row = insert.getRow();
- row.addBinary(payloadColumn, event.getBody());
-
- return Collections.singletonList((Operation) insert);
- } catch (Exception e) {
- throw new FlumeException("Failed to create Kudu Insert object", e);
- }
- }
-
- @Override
- public void close() {
- }
-}
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java
deleted file mode 100644
index 5517f1a..0000000
--- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/AvroKuduOperationsProducerTest.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.kudu.flume.sink;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.kudu.flume.sink.AvroKuduOperationsProducer.SCHEMA_LITERAL_HEADER;
-import static org.apache.kudu.flume.sink.AvroKuduOperationsProducer.SCHEMA_PROP;
-import static org.apache.kudu.flume.sink.AvroKuduOperationsProducer.SCHEMA_URL_HEADER;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER_PREFIX;
-import static org.apache.kudu.test.ClientTestUtil.scanTableToStrings;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.math.BigDecimal;
-import java.net.URL;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.io.Encoder;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.event.EventBuilder;
-import org.junit.Rule;
-import org.junit.Test;
-
-import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
-import org.apache.kudu.client.CreateTableOptions;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.test.KuduTestHarness;
-import org.apache.kudu.util.DecimalUtil;
-
-public class AvroKuduOperationsProducerTest {
- private static String schemaUriString;
- private static String schemaLiteral;
-
- static {
- try {
- String schemaPath = "/testAvroKuduOperationsProducer.avsc";
- URL schemaUrl = AvroKuduOperationsProducerTest.class.getResource(schemaPath);
- File schemaFile = Paths.get(schemaUrl.toURI()).toFile();
- schemaUriString = schemaFile.getAbsoluteFile().toURI().toString();
- schemaLiteral = Files.asCharSource(schemaFile, UTF_8).read();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- enum SchemaLocation {
- GLOBAL, URL, LITERAL
- }
-
- @Rule
- public KuduTestHarness harness = new KuduTestHarness();
-
- @Test
- public void testEmptyChannel() throws Exception {
- testEvents(0, SchemaLocation.GLOBAL);
- }
-
- @Test
- public void testOneEvent() throws Exception {
- testEvents(1, SchemaLocation.GLOBAL);
- }
-
- @Test
- public void testThreeEvents() throws Exception {
- testEvents(3, SchemaLocation.GLOBAL);
- }
-
- @Test
- public void testThreeEventsSchemaURLInEvent() throws Exception {
- testEvents(3, SchemaLocation.URL);
- }
-
- @Test
- public void testThreeEventsSchemaLiteralInEvent() throws Exception {
- testEvents(3, SchemaLocation.LITERAL);
- }
-
- private void testEvents(int eventCount, SchemaLocation schemaLocation)
- throws Exception {
- KuduTable table = createNewTable(
- String.format("test%sevents%s", eventCount, schemaLocation));
- String tableName = table.getName();
- Context context = schemaLocation != SchemaLocation.GLOBAL ? new Context()
- : new Context(ImmutableMap.of(PRODUCER_PREFIX + SCHEMA_PROP, schemaUriString));
- context.put(PRODUCER, AvroKuduOperationsProducer.class.getName());
-
- List<Event> events = generateEvents(eventCount, schemaLocation);
-
- KuduSinkTestUtil.processEventsCreatingSink(harness.getClient(), context, tableName, events);
-
- List<String> answers = makeAnswers(eventCount);
- List<String> rows = scanTableToStrings(table);
- assertEquals("wrong number of rows inserted", answers.size(), rows.size());
- assertArrayEquals("wrong rows inserted", answers.toArray(), rows.toArray());
- }
-
- private KuduTable createNewTable(String tableName) throws Exception {
- List<ColumnSchema> columns = new ArrayList<>(5);
- columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build());
- columns.add(new ColumnSchema.ColumnSchemaBuilder("longField", Type.INT64).build());
- columns.add(new ColumnSchema.ColumnSchemaBuilder("doubleField", Type.DOUBLE).build());
- columns.add(new ColumnSchema.ColumnSchemaBuilder("nullableField", Type.STRING)
- .nullable(true).build());
- columns.add(new ColumnSchema.ColumnSchemaBuilder("stringField", Type.STRING).build());
- columns.add(new ColumnSchema.ColumnSchemaBuilder("decimalField", Type.DECIMAL)
- .typeAttributes(DecimalUtil.typeAttributes(9, 1)).build());
- CreateTableOptions createOptions =
- new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("key"))
- .setNumReplicas(1);
- return harness.getClient().createTable(tableName, new Schema(columns), createOptions);
- }
-
- private List<Event> generateEvents(int eventCount,
- SchemaLocation schemaLocation) throws Exception {
- List<Event> events = new ArrayList<>();
- for (int i = 0; i < eventCount; i++) {
- AvroKuduOperationsProducerTestRecord record = new AvroKuduOperationsProducerTestRecord();
- record.setKey(10 * i);
- record.setLongField(2L * i);
- record.setDoubleField(2.71828 * i);
- record.setNullableField(i % 2 == 0 ? null : "taco");
- record.setStringField(String.format("hello %d", i));
- record.setDecimalField(BigDecimal.valueOf(i, 1));
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
- DatumWriter<AvroKuduOperationsProducerTestRecord> writer =
- new SpecificDatumWriter<>(AvroKuduOperationsProducerTestRecord.class);
- writer.write(record, encoder);
- encoder.flush();
- Event e = EventBuilder.withBody(out.toByteArray());
- if (schemaLocation == SchemaLocation.URL) {
- e.setHeaders(ImmutableMap.of(SCHEMA_URL_HEADER, schemaUriString));
- } else if (schemaLocation == SchemaLocation.LITERAL) {
- e.setHeaders(ImmutableMap.of(SCHEMA_LITERAL_HEADER, schemaLiteral));
- }
- events.add(e);
- }
- return events;
- }
-
- private List<String> makeAnswers(int eventCount) {
- List<String> answers = Lists.newArrayList();
- for (int i = 0; i < eventCount; i++) {
- answers.add(String.format(
- "INT32 key=%s, INT64 longField=%s, DOUBLE doubleField=%s, " +
- "STRING nullableField=%s, STRING stringField=hello %s, " +
- "DECIMAL decimalField(9, 1)=%s",
- 10 * i,
- 2 * i,
- 2.71828 * i,
- i % 2 == 0 ? "NULL" : "taco",
- i,
- BigDecimal.valueOf(i, 1)));
- }
- Collections.sort(answers);
- return answers;
- }
-}
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduOperationsProducerTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduOperationsProducerTest.java
deleted file mode 100644
index 58200fc..0000000
--- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KeyedKuduOperationsProducerTest.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.kudu.flume.sink;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER_PREFIX;
-import static org.apache.kudu.flume.sink.SimpleKeyedKuduOperationsProducer.KEY_COLUMN_DEFAULT;
-import static org.apache.kudu.flume.sink.SimpleKeyedKuduOperationsProducer.OPERATION_PROP;
-import static org.apache.kudu.flume.sink.SimpleKeyedKuduOperationsProducer.PAYLOAD_COLUMN_DEFAULT;
-import static org.apache.kudu.test.ClientTestUtil.scanTableToStrings;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.event.EventBuilder;
-import org.junit.Rule;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
-import org.apache.kudu.client.CreateTableOptions;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.test.KuduTestHarness;
-
-public class KeyedKuduOperationsProducerTest {
- private static final Logger LOG = LoggerFactory.getLogger(KeyedKuduOperationsProducerTest.class);
-
- @Rule
- public KuduTestHarness harness = new KuduTestHarness();
-
- private KuduTable createNewTable(String tableName) throws Exception {
- LOG.info("Creating new table...");
-
- ArrayList<ColumnSchema> columns = new ArrayList<>(2);
- columns.add(
- new ColumnSchema.ColumnSchemaBuilder(KEY_COLUMN_DEFAULT, Type.STRING)
- .key(true).build());
- columns.add(
- new ColumnSchema.ColumnSchemaBuilder(PAYLOAD_COLUMN_DEFAULT, Type.BINARY)
- .key(false).build());
- CreateTableOptions createOptions =
- new CreateTableOptions().setRangePartitionColumns(ImmutableList.of(KEY_COLUMN_DEFAULT))
- .setNumReplicas(1);
- KuduTable table =
- harness.getClient().createTable(tableName, new Schema(columns), createOptions);
-
- LOG.info("Created new table.");
-
- return table;
- }
-
- @Test
- public void testEmptyChannelWithInsert() throws Exception {
- testEvents(0, "insert");
- }
-
- @Test
- public void testOneEventWithInsert() throws Exception {
- testEvents(1, "insert");
- }
-
- @Test
- public void testThreeEventsWithInsert() throws Exception {
- testEvents(3, "insert");
- }
-
- @Test
- public void testEmptyChannelWithUpsert() throws Exception {
- testEvents(0, "upsert");
- }
-
- @Test
- public void testOneEventWithUpsert() throws Exception {
- testEvents(1, "upsert");
- }
-
- @Test
- public void testThreeEventsWithUpsert() throws Exception {
- testEvents(3, "upsert");
- }
-
- @Test
- public void testDuplicateRowsWithUpsert() throws Exception {
- LOG.info("Testing events with upsert...");
-
- KuduTable table = createNewTable("testDupUpsertEvents");
- String tableName = table.getName();
- Context ctx = new Context(ImmutableMap.of(
- PRODUCER_PREFIX + OPERATION_PROP, "upsert",
- PRODUCER, SimpleKeyedKuduOperationsProducer.class.getName()
- ));
- KuduSink sink = KuduSinkTestUtil.createSink(harness.getClient(), tableName, ctx);
- sink.start();
-
- int numRows = 3;
- List<Event> events = new ArrayList<>();
- for (int i = 0; i < numRows; i++) {
- Event e = EventBuilder.withBody(String.format("payload body %s", i), UTF_8);
- e.setHeaders(ImmutableMap.of(KEY_COLUMN_DEFAULT, String.format("key %s", i)));
- events.add(e);
- }
-
- KuduSinkTestUtil.processEvents(sink, events);
-
- List<String> rows = scanTableToStrings(table);
- assertEquals(numRows + " row(s) expected", numRows, rows.size());
-
- for (int i = 0; i < numRows; i++) {
- assertTrue("incorrect payload", rows.get(i).contains("payload body " + i));
- }
-
- Event dup = EventBuilder.withBody("payload body upserted".getBytes(UTF_8));
- dup.setHeaders(ImmutableMap.of("key", String.format("key %s", 0)));
-
- KuduSinkTestUtil.processEvents(sink, ImmutableList.of(dup));
-
- List<String> upRows = scanTableToStrings(table);
- assertEquals(numRows + " row(s) expected", numRows, upRows.size());
-
- assertTrue("incorrect payload", upRows.get(0).contains("payload body upserted"));
- for (int i = 1; i < numRows; i++) {
- assertTrue("incorrect payload", upRows.get(i).contains("payload body " + i));
- }
-
- LOG.info("Testing events with upsert finished successfully.");
- }
-
- private void testEvents(int eventCount, String operation) throws Exception {
- LOG.info("Testing {} events...", eventCount);
-
- KuduTable table = createNewTable("test" + eventCount + "events" + operation);
- String tableName = table.getName();
- Context context = new Context(ImmutableMap.of(
- PRODUCER_PREFIX + OPERATION_PROP, operation,
- PRODUCER, SimpleKeyedKuduOperationsProducer.class.getName()
- ));
-
- List<Event> events = getEvents(eventCount);
-
- KuduSinkTestUtil.processEventsCreatingSink(harness.getClient(), context, tableName, events);
-
- List<String> rows = scanTableToStrings(table);
- assertEquals(eventCount + " row(s) expected", eventCount, rows.size());
-
- for (int i = 0; i < eventCount; i++) {
- assertTrue("incorrect payload", rows.get(i).contains("payload body " + i));
- }
-
- LOG.info("Testing {} events finished successfully.", eventCount);
- }
-
- private List<Event> getEvents(int eventCount) {
- List<Event> events = new ArrayList<>();
- for (int i = 0; i < eventCount; i++) {
- Event e = EventBuilder.withBody(String.format("payload body %s", i).getBytes(UTF_8));
- e.setHeaders(ImmutableMap.of("key", String.format("key %s", i)));
- events.add(e);
- }
- return events;
- }
-}
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java
deleted file mode 100644
index 89b8c1a..0000000
--- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTest.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.kudu.flume.sink;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.kudu.test.ClientTestUtil.scanTableToStrings;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.FlumeException;
-import org.apache.flume.Sink;
-import org.apache.flume.Sink.Status;
-import org.apache.flume.Transaction;
-import org.apache.flume.conf.Configurables;
-import org.apache.flume.event.EventBuilder;
-import org.apache.kudu.test.KuduTestHarness;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
-import org.apache.kudu.client.CreateTableOptions;
-import org.apache.kudu.client.KuduTable;
-
-public class KuduSinkTest {
- private static final Logger LOG = LoggerFactory.getLogger(KuduSinkTest.class);
-
- @Rule
- public KuduTestHarness harness = new KuduTestHarness();
-
- private KuduTable createNewTable(String tableName) throws Exception {
- LOG.info("Creating new table...");
-
- ArrayList<ColumnSchema> columns = new ArrayList<>(1);
- columns.add(new ColumnSchema.ColumnSchemaBuilder("payload", Type.BINARY).key(true).build());
- CreateTableOptions createOptions =
- new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("payload"))
- .setNumReplicas(1);
- KuduTable table = harness.getClient().createTable(tableName, new Schema(columns), createOptions);
-
- LOG.info("Created new table.");
-
- return table;
- }
-
- @Test
- public void testMandatoryParameters() {
- LOG.info("Testing mandatory parameters...");
-
- KuduSink sink = new KuduSink(harness.getClient());
-
- HashMap<String, String> parameters = new HashMap<>();
- Context context = new Context(parameters);
- try {
- Configurables.configure(sink, context);
- Assert.fail("Should have failed due to missing properties");
- } catch (NullPointerException npe) {
- //good
- }
-
- parameters.put(KuduSinkConfigurationConstants.TABLE_NAME, "tableName");
- context = new Context(parameters);
- try {
- Configurables.configure(sink, context);
- Assert.fail("Should have failed due to missing properties");
- } catch (NullPointerException npe) {
- //good
- }
-
- LOG.info("Testing mandatory parameters finished successfully.");
- }
-
- @Test(expected = FlumeException.class)
- public void testMissingTable() {
- LOG.info("Testing missing table...");
-
- KuduSink sink = KuduSinkTestUtil.createSink(harness.getClient(), "missingTable", new Context());
- sink.start();
-
- LOG.info("Testing missing table finished successfully.");
- }
-
- @Test
- public void testEmptyChannelWithDefaults() throws Exception {
- testEventsWithDefaults(0);
- }
-
- @Test
- public void testOneEventWithDefaults() throws Exception {
- testEventsWithDefaults(1);
- }
-
- @Test
- public void testThreeEventsWithDefaults() throws Exception {
- testEventsWithDefaults(3);
- }
-
- @Test
- public void testDuplicateRowsWithDuplicatesIgnored() throws Exception {
- doTestDuplicateRows(true);
- }
-
- @Test
- public void testDuplicateRowsWithDuplicatesNotIgnored() throws Exception {
- doTestDuplicateRows(false);
- }
-
- private void doTestDuplicateRows(boolean ignoreDuplicateRows) throws Exception {
- KuduTable table = createNewTable("testDuplicateRows" + ignoreDuplicateRows);
- String tableName = table.getName();
- Context sinkContext = new Context();
- sinkContext.put(KuduSinkConfigurationConstants.IGNORE_DUPLICATE_ROWS,
- Boolean.toString(ignoreDuplicateRows));
- KuduSink sink = KuduSinkTestUtil.createSink(harness.getClient(), tableName, sinkContext);
- sink.start();
- Channel channel = sink.getChannel();
-
- Transaction tx = channel.getTransaction();
- tx.begin();
-
- for (int i = 0; i < 2; i++) {
- Event e = EventBuilder.withBody("key-0", UTF_8); // Duplicate keys.
- channel.put(e);
- }
-
- tx.commit();
- tx.close();
-
- try {
- Sink.Status status = sink.process();
- if (!ignoreDuplicateRows) {
- fail("Incorrectly ignored duplicate rows!");
- }
- assertSame("incorrect status for empty channel", status, Status.READY);
- } catch (EventDeliveryException e) {
- if (ignoreDuplicateRows) {
- throw new AssertionError("Failed to ignore duplicate rows!", e);
- } else {
- LOG.info("Correctly did not ignore duplicate rows", e);
- return;
- }
- }
-
- // We only get here if the process() succeeded.
- try {
- List<String> rows = scanTableToStrings(table);
- assertEquals("1 row expected", 1, rows.size());
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- LOG.info("Testing duplicate events finished successfully.");
- }
-
- private void testEventsWithDefaults(int eventCount) throws Exception {
- LOG.info("Testing {} events...", eventCount);
-
- KuduTable table = createNewTable("test" + eventCount + "events");
- String tableName = table.getName();
-
- List<Event> events = new ArrayList<>();
-
- for (int i = 0; i < eventCount; i++) {
- Event e = EventBuilder.withBody(String.format("payload body %s", i).getBytes(UTF_8));
- events.add(e);
- }
-
- KuduSinkTestUtil.processEventsCreatingSink(harness.getClient(), new Context(), tableName, events);
-
- List<String> rows = scanTableToStrings(table);
- assertEquals(eventCount + " row(s) expected", eventCount, rows.size());
-
- for (int i = 0; i < eventCount; i++) {
- assertTrue("incorrect payload", rows.get(i).contains("payload body " + i));
- }
-
- LOG.info("Testing {} events finished successfully.", eventCount);
- }
-}
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTestUtil.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTestUtil.java
deleted file mode 100644
index 3f5cfbf..0000000
--- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/KuduSinkTestUtil.java
+++ /dev/null
@@ -1,102 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.kudu.flume.sink;
-
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.KERBEROS_KEYTAB;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.KERBEROS_PRINCIPAL;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.MASTER_ADDRESSES;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.TABLE_NAME;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertSame;
-
-import java.util.List;
-
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.Sink.Status;
-import org.apache.flume.Transaction;
-import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.conf.Configurables;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.kudu.client.KuduClient;
-
-class KuduSinkTestUtil {
- private static final Logger LOG = LoggerFactory.getLogger(KuduSinkTestUtil.class);
-
- static KuduSink createSink(KuduClient client, String tableName, Context ctx) {
- return createSink(tableName, client, ctx, client.getMasterAddressesAsString());
- }
-
- private static KuduSink createSink(
- String tableName, KuduClient client, Context ctx, String masterAddresses) {
- LOG.info("Creating Kudu sink for '{}' table...", tableName);
-
- Context context = new Context();
- context.put(TABLE_NAME, tableName);
- context.put(MASTER_ADDRESSES, masterAddresses);
- context.putAll(ctx.getParameters());
- KuduSink sink = new KuduSink(client);
- Configurables.configure(sink, context);
- Channel channel = new MemoryChannel();
- Configurables.configure(channel, new Context());
- sink.setChannel(channel);
-
- LOG.info("Created Kudu sink for '{}' table.", tableName);
-
- return sink;
- }
-
- static KuduSink createSecureSink(String tableName, String masterAddresses, String clusterRoot) {
- Context context = new Context();
- context.put(KERBEROS_KEYTAB, clusterRoot + "/krb5kdc/test-user.keytab");
- context.put(KERBEROS_PRINCIPAL, "test-user@KRBTEST.COM");
-
- return createSink(tableName, null, context, masterAddresses);
- }
-
- static void processEventsCreatingSink(
- KuduClient syncClient, Context context, String tableName, List<Event> events
- ) throws EventDeliveryException {
- KuduSink sink = createSink(syncClient, tableName, context);
- sink.start();
- processEvents(sink, events);
- }
-
- static void processEvents(KuduSink sink, List<Event> events) throws EventDeliveryException {
- Channel channel = sink.getChannel();
- Transaction tx = channel.getTransaction();
- tx.begin();
- for (Event e : events) {
- channel.put(e);
- }
- tx.commit();
- tx.close();
-
- Status status = sink.process();
- if (events.isEmpty()) {
- assertSame("incorrect status for empty channel", status, Status.BACKOFF);
- } else {
- assertNotSame("incorrect status for non-empty channel", status, Status.BACKOFF);
- }
- }
-}
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerParseErrorTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerParseErrorTest.java
deleted file mode 100644
index 37e33c2..0000000
--- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerParseErrorTest.java
+++ /dev/null
@@ -1,297 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.kudu.flume.sink;
-
-import static org.apache.kudu.flume.sink.RegexpKuduOperationsProducer.BAD_COLUMN_VALUE_POLICY_PROP;
-import static org.apache.kudu.flume.sink.RegexpKuduOperationsProducer.MISSING_COLUMN_POLICY_PROP;
-import static org.apache.kudu.flume.sink.RegexpKuduOperationsProducer.OPERATION_PROP;
-import static org.apache.kudu.flume.sink.RegexpKuduOperationsProducer.PATTERN_PROP;
-import static org.apache.kudu.flume.sink.RegexpKuduOperationsProducer.SKIP_BAD_COLUMN_VALUE_PROP;
-import static org.apache.kudu.flume.sink.RegexpKuduOperationsProducer.SKIP_MISSING_COLUMN_PROP;
-import static org.apache.kudu.flume.sink.RegexpKuduOperationsProducer.UNMATCHED_ROW_POLICY_PROP;
-import static org.apache.kudu.flume.sink.RegexpKuduOperationsProducer.WARN_UNMATCHED_ROWS_PROP;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.Closeable;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.flume.Context;
-import org.apache.flume.FlumeException;
-import org.apache.flume.event.EventBuilder;
-import org.apache.kudu.test.KuduTestHarness;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.rules.RuleChain;
-
-import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
-import org.apache.kudu.client.CreateTableOptions;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.test.CapturingLogAppender;
-
-public class RegexpKuduOperationsProducerParseErrorTest {
- private static final String TEST_REGEXP = "(?<key>\\d+),(?<byteFld>\\d+),(?<stringFld>\\w+)";
- private static final String TEST_REGEXP_MISSING_COLUMN = "(?<key>\\d+),(?<byteFld>\\d+)";
- private static final String TEST_OPERATION = "insert";
-
- private static final String ROW_UNMATCHING = "invalid row";
- private static final String ROW_BAD_COLUMN_VALUE = "1,1000,string";
- private static final String ROW_MISSING_COLUMN = "1,1";
-
- private static final String ERROR_MSG_UNMATCHED_ROW =
- "Failed to match the pattern '" + TEST_REGEXP + "' in '" + ROW_UNMATCHING + "'";
- private static final String ERROR_MSG_MISSING_COLUMN =
- "Column 'stringFld' has no matching group in '" + ROW_MISSING_COLUMN + "'";
- private static final String ERROR_MSG_BAD_COLUMN_VALUE =
- "Raw value '" + ROW_BAD_COLUMN_VALUE +
- "' couldn't be parsed to type Type: int8 for column 'byteFld'";
-
- private static final String POLICY_REJECT = "REJECT";
- private static final String POLICY_WARN = "WARN";
- private static final String POLICY_IGNORE = "IGNORE";
-
- public KuduTestHarness harness = new KuduTestHarness();
- public ExpectedException thrown = ExpectedException.none();
-
- // ExpectedException misbehaves when combined with other rules; we use a
- // RuleChain to beat it into submission.
- //
- // See https://stackoverflow.com/q/28846088 for more information.
- @Rule
- public RuleChain chain = RuleChain.outerRule(harness).around(thrown);
-
- @Test
- public void testMissingColumnThrowsExceptionDefaultConfig() throws Exception {
- Context additionalContext = new Context();
- additionalContext.put(PATTERN_PROP, TEST_REGEXP_MISSING_COLUMN);
- testThrowsException(additionalContext, ERROR_MSG_MISSING_COLUMN, ROW_MISSING_COLUMN);
- }
-
- @Test
- public void testMissingColumnThrowsExceptionDeprecated() throws Exception {
- Context additionalContext = new Context();
- additionalContext.put(PATTERN_PROP, TEST_REGEXP_MISSING_COLUMN);
- additionalContext.put(SKIP_MISSING_COLUMN_PROP, String.valueOf(false));
- testThrowsException(additionalContext, ERROR_MSG_MISSING_COLUMN, ROW_MISSING_COLUMN);
- }
-
- @Test
- public void testMissingColumnThrowsException() throws Exception {
- Context additionalContext = new Context();
- additionalContext.put(PATTERN_PROP, TEST_REGEXP_MISSING_COLUMN);
- additionalContext.put(MISSING_COLUMN_POLICY_PROP, POLICY_REJECT);
- testThrowsException(additionalContext, ERROR_MSG_MISSING_COLUMN, ROW_MISSING_COLUMN);
- }
-
- @Test
- public void testMissingColumnLogsWarningDeprecated() throws Exception {
- Context additionalContext = new Context();
- additionalContext.put(PATTERN_PROP, TEST_REGEXP_MISSING_COLUMN);
- additionalContext.put(SKIP_MISSING_COLUMN_PROP, String.valueOf(true));
- testLogging(additionalContext, ERROR_MSG_MISSING_COLUMN, ROW_MISSING_COLUMN);
- }
-
- @Test
- public void testMissingColumnLogsWarning() throws Exception {
- Context additionalContext = new Context();
- additionalContext.put(PATTERN_PROP, TEST_REGEXP_MISSING_COLUMN);
- additionalContext.put(MISSING_COLUMN_POLICY_PROP, POLICY_WARN);
- testLogging(additionalContext, ERROR_MSG_MISSING_COLUMN, ROW_MISSING_COLUMN);
- }
-
-
- @Test
- public void testMissingColumnIgnored() throws Exception {
- Context additionalContext = new Context();
- additionalContext.put(PATTERN_PROP, TEST_REGEXP_MISSING_COLUMN);
- additionalContext.put(MISSING_COLUMN_POLICY_PROP, POLICY_IGNORE);
- testIgnored(additionalContext, ERROR_MSG_MISSING_COLUMN, ROW_MISSING_COLUMN);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testMissingColumnConfigValidation() throws Exception {
- Context additionalContext = new Context();
- additionalContext.put(SKIP_MISSING_COLUMN_PROP, String.valueOf(false));
- additionalContext.put(MISSING_COLUMN_POLICY_PROP, POLICY_IGNORE);
- getProducer(additionalContext);
- }
-
- @Test
- public void testBadColumnValueThrowsExceptionDefaultConfig() throws Exception {
- Context additionalContext = new Context();
- testThrowsException(additionalContext, ERROR_MSG_BAD_COLUMN_VALUE, ROW_BAD_COLUMN_VALUE);
- }
-
- @Test
- public void testBadColumnValueThrowsExceptionDeprecated() throws Exception {
- Context additionalContext = new Context();
- additionalContext.put(SKIP_BAD_COLUMN_VALUE_PROP, String.valueOf(false));
- testThrowsException(additionalContext, ERROR_MSG_BAD_COLUMN_VALUE, ROW_BAD_COLUMN_VALUE);
- }
-
- @Test
- public void testBadColumnValueThrowsException() throws Exception {
- Context additionalContext = new Context();
- additionalContext.put(BAD_COLUMN_VALUE_POLICY_PROP, POLICY_REJECT);
- testThrowsException(additionalContext, ERROR_MSG_BAD_COLUMN_VALUE, ROW_BAD_COLUMN_VALUE);
- }
-
- @Test
- public void testBadColumnValueLogsWarningDeprecated() throws Exception {
- Context additionalContext = new Context();
- additionalContext.put(SKIP_BAD_COLUMN_VALUE_PROP, String.valueOf(true));
- testLogging(additionalContext, ERROR_MSG_BAD_COLUMN_VALUE, ROW_BAD_COLUMN_VALUE);
- }
-
- @Test
- public void testBadColumnValueLogsWarning() throws Exception {
- Context additionalContext = new Context();
- additionalContext.put(BAD_COLUMN_VALUE_POLICY_PROP, POLICY_WARN);
- testLogging(additionalContext, ERROR_MSG_BAD_COLUMN_VALUE, ROW_BAD_COLUMN_VALUE);
- }
-
- @Test
- public void testBadColumnValueIgnored() throws Exception {
- Context additionalContext = new Context();
- additionalContext.put(BAD_COLUMN_VALUE_POLICY_PROP, POLICY_IGNORE);
- testIgnored(additionalContext, ERROR_MSG_BAD_COLUMN_VALUE, ROW_BAD_COLUMN_VALUE);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testBadColumnValueConfigValidation() throws Exception {
- Context additionalContext = new Context();
- additionalContext.put(SKIP_BAD_COLUMN_VALUE_PROP, String.valueOf(false));
- additionalContext.put(BAD_COLUMN_VALUE_POLICY_PROP, POLICY_IGNORE);
- getProducer(additionalContext);
- }
-
- @Test
- public void testUnmatchedRowLogsWarningWithDefaultConfig() throws Exception {
- Context additionalContext = new Context();
- testLogging(additionalContext, ERROR_MSG_UNMATCHED_ROW, ROW_UNMATCHING);
- }
-
- @Test
- public void testUnmatchedRowThrowsException() throws Exception {
- Context additionalContext = new Context();
- additionalContext.put(UNMATCHED_ROW_POLICY_PROP, POLICY_REJECT);
- testThrowsException(additionalContext, ERROR_MSG_UNMATCHED_ROW, ROW_UNMATCHING);
- }
-
- @Test
- public void testUnmatchedRowLogsWarningDeprecated() throws Exception {
- Context additionalContext = new Context();
- additionalContext.put(WARN_UNMATCHED_ROWS_PROP, String.valueOf(true));
- testLogging(additionalContext, ERROR_MSG_UNMATCHED_ROW, ROW_UNMATCHING);
- }
-
- @Test
- public void testUnmatchedRowLogsWarning() throws Exception {
- Context additionalContext = new Context();
- additionalContext.put(UNMATCHED_ROW_POLICY_PROP, POLICY_WARN);
- testLogging(additionalContext, ERROR_MSG_UNMATCHED_ROW, ROW_UNMATCHING);
- }
-
- @Test
- public void testUnmatchedRowIgnoredDeprecated() throws Exception {
- Context additionalContext = new Context();
- additionalContext.put(WARN_UNMATCHED_ROWS_PROP, String.valueOf(false));
- testIgnored(additionalContext, ERROR_MSG_UNMATCHED_ROW, ROW_UNMATCHING);
- }
-
- @Test
- public void testUnmatchedRowIgnored() throws Exception {
- Context additionalContext = new Context();
- additionalContext.put(UNMATCHED_ROW_POLICY_PROP, POLICY_IGNORE);
- testIgnored(additionalContext, ERROR_MSG_UNMATCHED_ROW, ROW_UNMATCHING);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testUnmatchedRowConfigValidation() throws Exception {
- Context additionalContext = new Context();
- additionalContext.put(WARN_UNMATCHED_ROWS_PROP, String.valueOf(false));
- additionalContext.put(UNMATCHED_ROW_POLICY_PROP, POLICY_IGNORE);
- getProducer(additionalContext);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testUnKnownPolicyConfigValidation() throws Exception {
- Context additionalContext = new Context();
- additionalContext.put(UNMATCHED_ROW_POLICY_PROP, "FORCED");
- getProducer(additionalContext);
- }
-
- private void testLogging(
- Context additionalContext, String expectedError, String eventBody) throws Exception {
- String appendedText = processEvent(additionalContext, eventBody);
- assertTrue(appendedText.contains(expectedError));
- }
-
- private void testIgnored(
- Context additionalContext, String expectedError, String eventBody) throws Exception {
- String appendedText = processEvent(additionalContext, eventBody);
- assertFalse(appendedText.contains(expectedError));
- }
-
- private void testThrowsException(
- Context additionalContext, String expectedError, String eventBody) throws Exception {
- thrown.expect(FlumeException.class);
- thrown.expectMessage(expectedError);
- processEvent(additionalContext, eventBody);
- }
-
- private String processEvent(Context additionalContext, String eventBody) throws Exception {
- CapturingLogAppender appender = new CapturingLogAppender();
- RegexpKuduOperationsProducer producer = getProducer(additionalContext);
- try (Closeable c = appender.attach()) {
- producer.getOperations(EventBuilder.withBody(eventBody.getBytes(Charset.forName("UTF-8"))));
- }
- return appender.getAppendedText();
- }
-
-
- private RegexpKuduOperationsProducer getProducer(Context additionalContext) throws Exception {
- RegexpKuduOperationsProducer producer = new RegexpKuduOperationsProducer();
- producer.initialize(createNewTable("test"));
- Context context = new Context();
- context.put(PATTERN_PROP, TEST_REGEXP);
- context.put(OPERATION_PROP, TEST_OPERATION);
- context.putAll(additionalContext.getParameters());
- producer.configure(context);
-
- return producer;
- }
-
- private KuduTable createNewTable(String tableName) throws Exception {
- ArrayList<ColumnSchema> columns = new ArrayList<>(10);
- columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build());
- columns.add(new ColumnSchema.ColumnSchemaBuilder("byteFld", Type.INT8).build());
- columns.add(new ColumnSchema.ColumnSchemaBuilder("stringFld", Type.STRING).build());
- CreateTableOptions createOptions =
- new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 3).setNumReplicas(1);
- KuduTable table = harness.getClient().createTable(tableName, new Schema(columns), createOptions);
- return table;
- }
-
-
-}
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java
deleted file mode 100644
index f063349..0000000
--- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/RegexpKuduOperationsProducerTest.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.kudu.flume.sink;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER;
-import static org.apache.kudu.flume.sink.KuduSinkConfigurationConstants.PRODUCER_PREFIX;
-import static org.apache.kudu.flume.sink.RegexpKuduOperationsProducer.OPERATION_PROP;
-import static org.apache.kudu.flume.sink.RegexpKuduOperationsProducer.PATTERN_PROP;
-import static org.apache.kudu.test.ClientTestUtil.scanTableToStrings;
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.event.EventBuilder;
-import org.apache.kudu.test.KuduTestHarness;
-import org.junit.Rule;
-import org.junit.Test;
-
-import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
-import org.apache.kudu.client.CreateTableOptions;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.util.DecimalUtil;
-
-public class RegexpKuduOperationsProducerTest {
- private static final String TEST_REGEXP =
- "(?<key>\\d+),(?<byteFld>\\d+),(?<shortFld>\\d+),(?<intFld>\\d+)," +
- "(?<longFld>\\d+),(?<binaryFld>\\w+),(?<stringFld>\\w+),(?<boolFld>\\w+)," +
- "(?<floatFld>\\d+\\.\\d*),(?<doubleFld>\\d+.\\d*),(?<decimalFld>\\d+.\\d*)";
-
- @Rule
- public KuduTestHarness harness = new KuduTestHarness();
-
- private KuduTable createNewTable(String tableName) throws Exception {
- ArrayList<ColumnSchema> columns = new ArrayList<>(10);
- columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build());
- columns.add(new ColumnSchema.ColumnSchemaBuilder("byteFld", Type.INT8).build());
- columns.add(new ColumnSchema.ColumnSchemaBuilder("shortFld", Type.INT16).build());
- columns.add(new ColumnSchema.ColumnSchemaBuilder("intFld", Type.INT32).build());
- columns.add(new ColumnSchema.ColumnSchemaBuilder("longFld", Type.INT64).build());
- columns.add(new ColumnSchema.ColumnSchemaBuilder("binaryFld", Type.BINARY).build());
- columns.add(new ColumnSchema.ColumnSchemaBuilder("stringFld", Type.STRING).build());
- columns.add(new ColumnSchema.ColumnSchemaBuilder("boolFld", Type.BOOL).build());
- columns.add(new ColumnSchema.ColumnSchemaBuilder("floatFld", Type.FLOAT).build());
- columns.add(new ColumnSchema.ColumnSchemaBuilder("doubleFld", Type.DOUBLE).build());
- columns.add(new ColumnSchema.ColumnSchemaBuilder("decimalFld", Type.DECIMAL)
- .typeAttributes(DecimalUtil.typeAttributes(9, 1)).build());
- CreateTableOptions createOptions =
- new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 3).setNumReplicas(1);
- return harness.getClient().createTable(tableName, new Schema(columns), createOptions);
- }
-
- @Test
- public void testEmptyChannel() throws Exception {
- testEvents(0, 1, "insert");
- }
-
- @Test
- public void testOneEvent() throws Exception {
- testEvents(1, 1, "insert");
- }
-
- @Test
- public void testThreeEvents() throws Exception {
- testEvents(3, 1, "insert");
- }
-
- @Test
- public void testThreeEventsWithUpsert() throws Exception {
- testEvents(3, 1, "upsert");
- }
-
- @Test
- public void testOneEventTwoRowsEach() throws Exception {
- testEvents(1, 2, "insert");
- }
-
- @Test
- public void testTwoEventsTwoRowsEach() throws Exception {
- testEvents(2, 2, "insert");
- }
-
- @Test
- public void testTwoEventsTwoRowsEachWithUpsert() throws Exception {
- testEvents(2, 2, "upsert");
- }
-
- private void testEvents(int eventCount, int perEventRowCount, String operation) throws Exception {
- String tableName = String.format("test%sevents%srowseach%s",
- eventCount, perEventRowCount, operation);
- Context context = new Context();
- context.put(PRODUCER_PREFIX + PATTERN_PROP, TEST_REGEXP);
- context.put(PRODUCER_PREFIX + OPERATION_PROP, operation);
- context.put(PRODUCER, RegexpKuduOperationsProducer.class.getName());
- KuduTable table = createNewTable(tableName);
-
- List<Event> events = generateEvents(eventCount, perEventRowCount, operation);
-
- KuduSinkTestUtil.processEventsCreatingSink(harness.getClient(), context, tableName, events);
-
- List<String> rows = scanTableToStrings(table);
- assertEquals(eventCount * perEventRowCount + " row(s) expected",
- eventCount * perEventRowCount,
- rows.size());
-
- ArrayList<String> rightAnswers = new ArrayList<>(eventCount * perEventRowCount);
- for (int i = 0; i < eventCount; i++) {
- for (int j = 0; j < perEventRowCount; j++) {
- int value = operation.equals("upsert") && i == 0 ? 1 : i;
- String baseAnswer = "INT32 key=1%2$d%3$d1, INT8 byteFld=%1$d, INT16 shortFld=%1$d, " +
- "INT32 intFld=%1$d, INT64 longFld=%1$d, BINARY binaryFld=\"binary\", " +
- "STRING stringFld=string, BOOL boolFld=false, FLOAT floatFld=%1$d.%1$d, " +
- "DOUBLE doubleFld=%1$d.%1$d, DECIMAL decimalFld(9, 1)=%1$d.%1$d";
- String rightAnswer = String.format(baseAnswer, value, i, j);
- rightAnswers.add(rightAnswer);
- }
- }
- Collections.sort(rightAnswers);
-
- for (int k = 0; k < eventCount * perEventRowCount; k++) {
- assertEquals("incorrect row", rightAnswers.get(k), rows.get(k));
- }
- }
-
- private List<Event> generateEvents(int eventCount, int perEventRowCount, String operation) {
- List<Event> events = new ArrayList<>();
-
- for (int i = 0; i < eventCount; i++) {
- StringBuilder payload = new StringBuilder();
- for (int j = 0; j < perEventRowCount; j++) {
- String baseRow = "|1%1$d%2$d1,%1$d,%1$d,%1$d,%1$d,binary," +
- "string,false,%1$d.%1$d,%1$d.%1$d,%1$d.%1$d,%1$d|";
- String row = String.format(baseRow, i, j);
- payload.append(row);
- }
- Event e = EventBuilder.withBody(payload.toString().getBytes(UTF_8));
- events.add(e);
- }
-
- if (eventCount > 0) {
- // In the upsert case, add one upsert row per insert event (i.e. per i)
- // All such rows go in one event.
- if (operation.equals("upsert")) {
- StringBuilder upserts = new StringBuilder();
- for (int j = 0; j < perEventRowCount; j++) {
- String row = String.format("|1%2$d%3$d1,%1$d,%1$d,%1$d,%1$d,binary," +
- "string,false,%1$d.%1$d,%1$d.%1$d,%1$d.%1$d,%1$d|", 1, 0, j);
- upserts.append(row);
- }
- Event e = EventBuilder.withBody(upserts.toString().getBytes(UTF_8));
- events.add(e);
- }
-
- // Also check some bad/corner cases.
- String mismatchInInt = "|1,2,taco,4,5,x,y,true,1.0.2.0,999|";
- String emptyString = "";
- String[] testCases = {mismatchInInt, emptyString};
- for (String testCase : testCases) {
- Event e = EventBuilder.withBody(testCase.getBytes(UTF_8));
- events.add(e);
- }
- }
- return events;
- }
-}
diff --git a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/SecureKuduSinkTest.java b/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/SecureKuduSinkTest.java
deleted file mode 100644
index 11aa07b..0000000
--- a/java/kudu-flume-sink/src/test/java/org/apache/kudu/flume/sink/SecureKuduSinkTest.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.kudu.flume.sink;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.kudu.test.ClientTestUtil.scanTableToStrings;
-import static org.apache.kudu.util.SecurityUtil.KUDU_TICKETCACHE_PROPERTY;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.time.Duration;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.event.EventBuilder;
-import org.apache.kudu.test.KuduTestHarness;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
-import org.apache.kudu.client.CreateTableOptions;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.test.cluster.MiniKuduCluster.MiniKuduClusterBuilder;
-
-public class SecureKuduSinkTest {
- private static final Logger LOG = LoggerFactory.getLogger(SecureKuduSinkTest.class);
- private static final int TICKET_LIFETIME_SECONDS = 20;
- private static final int RENEWABLE_LIFETIME_SECONDS = 35;
-
- private static final MiniKuduClusterBuilder clusterBuilder = KuduTestHarness.getBaseClusterBuilder()
- .kdcTicketLifetime(TICKET_LIFETIME_SECONDS + "s")
- .kdcRenewLifetime(RENEWABLE_LIFETIME_SECONDS + "s")
- .enableKerberos();
-
- @Rule
- public KuduTestHarness harness = new KuduTestHarness(clusterBuilder);
-
- @Before
- public void clearTicketCacheProperty() {
- // Let Flume authenticate.
- System.clearProperty(KUDU_TICKETCACHE_PROPERTY);
- }
-
- @Test
- public void testEventsWithShortTickets() throws Exception {
- Instant start = Instant.now();
- LOG.info("Creating new table...");
- ArrayList<ColumnSchema> columns = new ArrayList<>(1);
- columns.add(new ColumnSchema.ColumnSchemaBuilder("payload", Type.BINARY).key(true).build());
- CreateTableOptions createOptions =
- new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("payload"))
- .setNumReplicas(1);
- String tableName = "test_long_lived_events";
- KuduTable table = harness.getClient().createTable(tableName, new Schema(columns), createOptions);
- LOG.info("Created new table.");
-
- KuduSink sink = KuduSinkTestUtil.createSecureSink(
- tableName, harness.getMasterAddressesAsString(), harness.getClusterRoot());
- sink.start();
-
- LOG.info("Testing events at the beginning.");
- int eventCount = 10;
-
- processEvents(sink, 0, eventCount / 2);
-
- LOG.info("Waiting for tickets to expire");
- Duration elapsedSoFar = Duration.between(Instant.now(), start);
- TimeUnit.MILLISECONDS.sleep(1000 * (RENEWABLE_LIFETIME_SECONDS + 1) - elapsedSoFar.toMillis());
- // At this point, the ticket will have been outstanding for at least
- // (RENEWABLE_LIFETIME_SECONDS + 1) seconds-- so the sink will need to reacquire a ticket.
-
- LOG.info("Testing events after ticket renewal.");
- processEvents(sink, eventCount / 2, eventCount);
-
- List<String> rows = scanTableToStrings(table);
- assertEquals(eventCount + " row(s) expected", eventCount, rows.size());
-
- for (int i = 0; i < eventCount; i++) {
- assertTrue("incorrect payload", rows.get(i).contains("payload body " + i));
- }
-
- LOG.info("Testing {} events finished successfully.", eventCount);
- }
-
- private void processEvents(KuduSink sink, int from, int to) throws EventDeliveryException {
- List<Event> events = new ArrayList<>();
- for (int i = from; i < to; i++) {
- Event e = EventBuilder.withBody(String.format("payload body %s", i).getBytes(UTF_8));
- events.add(e);
- }
-
- KuduSinkTestUtil.processEvents(sink, events);
- LOG.info("Events flushed.");
- }
-}
diff --git a/java/kudu-flume-sink/src/test/resources/log4j2.properties b/java/kudu-flume-sink/src/test/resources/log4j2.properties
deleted file mode 100644
index 22762a1..0000000
--- a/java/kudu-flume-sink/src/test/resources/log4j2.properties
+++ /dev/null
@@ -1,32 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-status = error
-name = PropertiesConfig
-appenders = console
-
-appender.console.type = Console
-appender.console.name = STDOUT
-appender.console.layout.type = PatternLayout
-appender.console.layout.pattern = %d{HH:mm:ss.SSS} [%p - %t] (%F:%L) %m%n
-
-rootLogger.level = info
-rootLogger.appenderRefs = stdout
-rootLogger.appenderRef.stdout.ref = STDOUT
-
-logger.kudu.name = org.apache.kudu
-logger.kudu.level = debug
diff --git a/java/kudu-flume-sink/src/test/resources/testAvroKuduOperationsProducer.avsc b/java/kudu-flume-sink/src/test/resources/testAvroKuduOperationsProducer.avsc
deleted file mode 100644
index 6bcf6d2..0000000
--- a/java/kudu-flume-sink/src/test/resources/testAvroKuduOperationsProducer.avsc
+++ /dev/null
@@ -1,14 +0,0 @@
-{"namespace": "org.apache.kudu.flume.sink",
- "type": "record",
- "name": "AvroKuduOperationsProducerTestRecord",
- "fields": [
- {"name": "key", "type": "int"},
- {"name": "longField", "type": "long"},
- {"name": "doubleField", "type": "double"},
- {"name": "nullableField", "type": ["string", "null"]},
- {"name": "stringField", "type": "string"},
- {"name": "decimalField", "type": {
- "type": "bytes", "logicalType": "decimal", "precision": 9, "scale": 1}
- }
- ]
-}
\ No newline at end of file
diff --git a/java/settings.gradle b/java/settings.gradle
index 8a8bcfc..a89c703 100644
--- a/java/settings.gradle
+++ b/java/settings.gradle
@@ -24,7 +24,6 @@ include "kudu-backup-common"
include "kudu-backup-tools"
include "kudu-client"
include "kudu-client-tools"
-include "kudu-flume-sink"
include "kudu-hive"
include "kudu-jepsen"
include "kudu-mapreduce"