You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2015/02/10 08:16:18 UTC
[2/4] flume git commit: FLUME-1734. Add a Hive Sink based on Hive
Streaming support.
http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e50.dat
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e50.dat b/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e50.dat
new file mode 100644
index 0000000..fd69350
Binary files /dev/null and b/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e50.dat differ
http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e60.dat
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e60.dat b/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e60.dat
new file mode 100644
index 0000000..9ae5d87
Binary files /dev/null and b/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e60.dat differ
http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e71.dat
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e71.dat b/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e71.dat
new file mode 100644
index 0000000..762cc2a
Binary files /dev/null and b/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e71.dat differ
http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e80.dat
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e80.dat b/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e80.dat
new file mode 100644
index 0000000..fd69350
Binary files /dev/null and b/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e80.dat differ
http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/flume-ng-sinks/flume-hive-sink/metastore_db/service.properties
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/metastore_db/service.properties b/flume-ng-sinks/flume-hive-sink/metastore_db/service.properties
new file mode 100644
index 0000000..e385a7b
--- /dev/null
+++ b/flume-ng-sinks/flume-hive-sink/metastore_db/service.properties
@@ -0,0 +1,22 @@
+#/Users/hshreedharan/work/flume-latest/flume/flume-ng-sinks/flume-hive-sink/metastore_db
+# ********************************************************************
+# *** Please do NOT edit this file. ***
+# *** CHANGING THE CONTENT OF THIS FILE MAY CAUSE DATA CORRUPTION. ***
+# ********************************************************************
+#Mon Feb 09 20:19:39 PST 2015
+SysschemasIndex2Identifier=225
+SyscolumnsIdentifier=144
+SysconglomeratesIndex1Identifier=49
+SysconglomeratesIdentifier=32
+SyscolumnsIndex2Identifier=177
+SysschemasIndex1Identifier=209
+SysconglomeratesIndex3Identifier=81
+SystablesIndex2Identifier=129
+SyscolumnsIndex1Identifier=161
+derby.serviceProtocol=org.apache.derby.database.Database
+SysschemasIdentifier=192
+derby.storage.propertiesId=16
+SysconglomeratesIndex2Identifier=65
+derby.serviceLocale=en_US
+SystablesIdentifier=96
+SystablesIndex1Identifier=113
http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/flume-ng-sinks/flume-hive-sink/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/pom.xml b/flume-ng-sinks/flume-hive-sink/pom.xml
new file mode 100644
index 0000000..e5f673a
--- /dev/null
+++ b/flume-ng-sinks/flume-hive-sink/pom.xml
@@ -0,0 +1,189 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-sinks</artifactId>
+ <version>1.6.0-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.apache.flume.flume-ng-sinks</groupId>
+ <artifactId>flume-hive-sink</artifactId>
+ <name>Flume NG Hive Sink</name>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>hadoop-1.0</id>
+ <activation>
+ <property>
+ <name>!hadoop.profile</name>
+ </property>
+ </activation>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
+ <id>hadoop-2</id>
+ <activation>
+ <property>
+ <name>hadoop.profile</name>
+ <value>2</value>
+ </property>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <scope>test</scope>
+ <version>${hadoop.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
+
+ <profile>
+ <id>hbase-98</id>
+ <activation>
+ <property>
+ <name>hadoop.profile</name>
+ <value>hbase-98</value>
+ </property>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>test</scope>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <scope>test</scope>
+ <version>${hadoop.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-sdk</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-configuration</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hive.hcatalog</groupId>
+ <artifactId>hive-hcatalog-streaming</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hive.hcatalog</groupId>
+ <artifactId>hive-hcatalog-core</artifactId>
+ <scope>provided</scope>
+ <version>${hive.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-cli</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <!--temporary - really belongs to hive-streaming : roshan -->
+ <dependency>
+ <groupId>xerces</groupId>
+ <artifactId>xercesImpl</artifactId>
+ <scope>runtime</scope>
+ <version>2.9.1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>xalan</groupId>
+ <artifactId>serializer</artifactId>
+ <version>2.7.1</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>xalan</groupId>
+ <artifactId>xalan</artifactId>
+ <scope>runtime</scope>
+ <version>2.7.1</version>
+ </dependency>
+ <!-- end temporary -->
+
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/Config.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/Config.java b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/Config.java
new file mode 100644
index 0000000..b2d2582
--- /dev/null
+++ b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/Config.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hive;
+
+public class Config {
+ public static final String HIVE_METASTORE = "hive.metastore";
+ public static final String HIVE_DATABASE = "hive.database";
+ public static final String HIVE_TABLE = "hive.table";
+ public static final String HIVE_PARTITION = "hive.partition";
+ public static final String HIVE_TXNS_PER_BATCH_ASK = "hive.txnsPerBatchAsk";
+ public static final String BATCH_SIZE = "batchSize";
+ public static final String IDLE_TIMEOUT = "idleTimeout";
+ public static final String CALL_TIMEOUT = "callTimeout";
+ public static final String HEART_BEAT_INTERVAL = "heartBeatInterval";
+ public static final String MAX_OPEN_CONNECTIONS = "maxOpenConnections";
+ public static final String USE_LOCAL_TIME_STAMP = "useLocalTimeStamp";
+ public static final String TIME_ZONE = "timeZone";
+ public static final String ROUND_UNIT = "roundUnit";
+ public static final String ROUND = "round";
+ public static final String HOUR = "hour";
+ public static final String MINUTE = "minute";
+ public static final String SECOND = "second";
+ public static final String ROUND_VALUE = "roundValue";
+ public static final String SERIALIZER = "serializer";
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveDelimitedTextSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveDelimitedTextSerializer.java b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveDelimitedTextSerializer.java
new file mode 100644
index 0000000..8f64435
--- /dev/null
+++ b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveDelimitedTextSerializer.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hive;
+
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.hive.hcatalog.streaming.DelimitedInputWriter;
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
+import org.apache.hive.hcatalog.streaming.RecordWriter;
+import org.apache.hive.hcatalog.streaming.StreamingException;
+import org.apache.hive.hcatalog.streaming.TransactionBatch;
+
+import java.io.IOException;
+
+/** Forwards the incoming event body to Hive unmodified
+ * Sets up the delimiter and the field to column mapping
+ */
+public class HiveDelimitedTextSerializer implements HiveEventSerializer {
+ public static final String ALIAS = "DELIMITED";
+
+ public static final String defaultDelimiter = ",";
+ public static final String SERIALIZER_DELIMITER = "serializer.delimiter";
+ public static final String SERIALIZER_FIELDNAMES = "serializer.fieldnames";
+ public static final String SERIALIZER_SERDE_SEPARATOR = "serializer.serdeSeparator";
+
+ private String delimiter;
+ private String[] fieldToColMapping = null;
+ private Character serdeSeparator = null;
+
+ @Override
+ public void write(TransactionBatch txnBatch, Event e)
+ throws StreamingException, IOException, InterruptedException {
+ txnBatch.write(e.getBody());
+ }
+
+ @Override
+ public RecordWriter createRecordWriter(HiveEndPoint endPoint)
+ throws StreamingException, IOException, ClassNotFoundException {
+ if (serdeSeparator == null) {
+ return new DelimitedInputWriter(fieldToColMapping, delimiter, endPoint);
+ }
+ return new DelimitedInputWriter(fieldToColMapping, delimiter, endPoint, null
+ , serdeSeparator);
+ }
+
+ @Override
+ public void configure(Context context) {
+ delimiter = parseDelimiterSpec(
+ context.getString(SERIALIZER_DELIMITER, defaultDelimiter) );
+ String fieldNames = context.getString(SERIALIZER_FIELDNAMES);
+ if (fieldNames == null) {
+ throw new IllegalArgumentException("serializer.fieldnames is not specified " +
+ "for serializer " + this.getClass().getName() );
+ }
+ String serdeSeparatorStr = context.getString(SERIALIZER_SERDE_SEPARATOR);
+ this.serdeSeparator = parseSerdeSeparatorSpec(serdeSeparatorStr);
+
+ // split, but preserve empty fields (-1)
+ fieldToColMapping = fieldNames.trim().split(",",-1);
+ }
+
+ // if delimiter is a double quoted like "\t", drop quotes
+ private static String parseDelimiterSpec(String delimiter) {
+ if (delimiter == null) {
+ return null;
+ }
+ if (delimiter.charAt(0) == '"' &&
+ delimiter.charAt(delimiter.length()-1) == '"') {
+ return delimiter.substring(1,delimiter.length()-1);
+ }
+ return delimiter;
+ }
+
+ // if delimiter is a single quoted character like '\t', drop quotes
+ private static Character parseSerdeSeparatorSpec(String separatorStr) {
+ if (separatorStr == null) {
+ return null;
+ }
+ if (separatorStr.length() == 1) {
+ return separatorStr.charAt(0);
+ }
+ if (separatorStr.length() == 3 &&
+ separatorStr.charAt(2) == '\'' &&
+ separatorStr.charAt(separatorStr.length()-1) == '\'') {
+ return separatorStr.charAt(1);
+ }
+
+ throw new IllegalArgumentException("serializer.serdeSeparator spec is invalid " +
+ "for " + ALIAS + " serializer " );
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveEventSerializer.java b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveEventSerializer.java
new file mode 100644
index 0000000..c233d3d
--- /dev/null
+++ b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveEventSerializer.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hive;
+
+
+import org.apache.flume.Event;
+import org.apache.flume.conf.Configurable;
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
+import org.apache.hive.hcatalog.streaming.RecordWriter;
+import org.apache.hive.hcatalog.streaming.StreamingException;
+import org.apache.hive.hcatalog.streaming.TransactionBatch;
+
+import java.io.IOException;
+
+public interface HiveEventSerializer extends Configurable {
+ public void write(TransactionBatch batch, Event e)
+ throws StreamingException, IOException, InterruptedException;
+
+ RecordWriter createRecordWriter(HiveEndPoint endPoint)
+ throws StreamingException, IOException, ClassNotFoundException;
+
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveJsonSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveJsonSerializer.java b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveJsonSerializer.java
new file mode 100644
index 0000000..a75073f
--- /dev/null
+++ b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveJsonSerializer.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hive;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
+import org.apache.hive.hcatalog.streaming.RecordWriter;
+import org.apache.hive.hcatalog.streaming.StreamingException;
+import org.apache.hive.hcatalog.streaming.StrictJsonWriter;
+import org.apache.hive.hcatalog.streaming.TransactionBatch;
+
+import java.io.IOException;
+
+/** Forwards the incoming event body to Hive unmodified
+ * Sets up the delimiter and the field to column mapping
+ */
+
+public class HiveJsonSerializer implements HiveEventSerializer {
+ public static final String ALIAS = "JSON";
+
+ @Override
+ public void write(TransactionBatch txnBatch, Event e)
+ throws StreamingException, IOException, InterruptedException {
+ txnBatch.write(e.getBody());
+ }
+
+ @Override
+ public RecordWriter createRecordWriter(HiveEndPoint endPoint)
+ throws StreamingException, IOException, ClassNotFoundException {
+ return new StrictJsonWriter(endPoint);
+ }
+
+ @Override
+ public void configure(Context context) {
+ return;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java
new file mode 100644
index 0000000..6fe332a
--- /dev/null
+++ b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java
@@ -0,0 +1,524 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hive;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.formatter.output.BucketPath;
+import org.apache.flume.instrumentation.SinkCounter;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.hive.hcatalog.streaming.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TimeZone;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class HiveSink extends AbstractSink implements Configurable {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(HiveSink.class);
+
+ private static final int DEFAULT_MAXOPENCONNECTIONS = 500;
+ private static final int DEFAULT_TXNSPERBATCH = 100;
+ private static final int DEFAULT_BATCHSIZE = 15000;
+ private static final int DEFAULT_CALLTIMEOUT = 10000;
+ private static final int DEFAULT_IDLETIMEOUT = 0;
+ private static final int DEFAULT_HEARTBEATINTERVAL = 240; // seconds
+
+
+ private Map<HiveEndPoint, HiveWriter> allWriters;
+
+ private SinkCounter sinkCounter;
+ private volatile int idleTimeout;
+ private String metaStoreUri;
+ private String proxyUser;
+ private String database;
+ private String table;
+ private List<String> partitionVals;
+ private Integer txnsPerBatchAsk;
+ private Integer batchSize;
+ private Integer maxOpenConnections;
+ private boolean autoCreatePartitions;
+ private String serializerType;
+ private HiveEventSerializer serializer;
+
+ /**
+ * Default timeout for blocking I/O calls in HiveWriter
+ */
+ private Integer callTimeout;
+ private Integer heartBeatInterval;
+
+ private ExecutorService callTimeoutPool;
+
+ private boolean useLocalTime;
+ private TimeZone timeZone;
+ private boolean needRounding;
+ private int roundUnit;
+ private Integer roundValue;
+
+ private Timer heartBeatTimer = new Timer();
+ private AtomicBoolean timeToSendHeartBeat = new AtomicBoolean(false);
+
+ @VisibleForTesting
+ Map<HiveEndPoint, HiveWriter> getAllWriters() {
+ return allWriters;
+ }
+
+ // read configuration and setup thresholds
+ @Override
+ public void configure(Context context) {
+
+ metaStoreUri = context.getString(Config.HIVE_METASTORE);
+ if (metaStoreUri == null) {
+ throw new IllegalArgumentException(Config.HIVE_METASTORE + " config setting is not " +
+ "specified for sink " + getName());
+ }
+ if (metaStoreUri.equalsIgnoreCase("null")) { // for testing support
+ metaStoreUri = null;
+ }
+ proxyUser = null; // context.getString("hive.proxyUser"); not supported by hive api yet
+ database = context.getString(Config.HIVE_DATABASE);
+ if (database == null) {
+ throw new IllegalArgumentException(Config.HIVE_DATABASE + " config setting is not " +
+ "specified for sink " + getName());
+ }
+ table = context.getString(Config.HIVE_TABLE);
+ if (table == null) {
+ throw new IllegalArgumentException(Config.HIVE_TABLE + " config setting is not " +
+ "specified for sink " + getName());
+ }
+
+ String partitions = context.getString(Config.HIVE_PARTITION);
+ if (partitions != null) {
+ partitionVals = Arrays.asList(partitions.split(","));
+ }
+
+
+ txnsPerBatchAsk = context.getInteger(Config.HIVE_TXNS_PER_BATCH_ASK, DEFAULT_TXNSPERBATCH);
+ if (txnsPerBatchAsk < 0) {
+ LOG.warn(getName() + ". hive.txnsPerBatchAsk must be positive number. Defaulting to "
+ + DEFAULT_TXNSPERBATCH);
+ txnsPerBatchAsk = DEFAULT_TXNSPERBATCH;
+ }
+ batchSize = context.getInteger(Config.BATCH_SIZE, DEFAULT_BATCHSIZE);
+ if (batchSize < 0) {
+ LOG.warn(getName() + ". batchSize must be positive number. Defaulting to "
+ + DEFAULT_BATCHSIZE);
+ batchSize = DEFAULT_BATCHSIZE;
+ }
+ idleTimeout = context.getInteger(Config.IDLE_TIMEOUT, DEFAULT_IDLETIMEOUT);
+ if (idleTimeout < 0) {
+ LOG.warn(getName() + ". idleTimeout must be positive number. Defaulting to "
+ + DEFAULT_IDLETIMEOUT);
+ idleTimeout = DEFAULT_IDLETIMEOUT;
+ }
+ callTimeout = context.getInteger(Config.CALL_TIMEOUT, DEFAULT_CALLTIMEOUT);
+ if (callTimeout < 0) {
+ LOG.warn(getName() + ". callTimeout must be positive number. Defaulting to "
+ + DEFAULT_CALLTIMEOUT);
+ callTimeout = DEFAULT_CALLTIMEOUT;
+ }
+
+ heartBeatInterval = context.getInteger(Config.HEART_BEAT_INTERVAL, DEFAULT_HEARTBEATINTERVAL);
+ if (heartBeatInterval < 0) {
+ LOG.warn(getName() + ". heartBeatInterval must be positive number. Defaulting to "
+ + DEFAULT_HEARTBEATINTERVAL);
+ heartBeatInterval = DEFAULT_HEARTBEATINTERVAL;
+ }
+ maxOpenConnections = context.getInteger(Config.MAX_OPEN_CONNECTIONS, DEFAULT_MAXOPENCONNECTIONS);
+ autoCreatePartitions = context.getBoolean("autoCreatePartitions", true);
+
+ // Timestamp processing
+ useLocalTime = context.getBoolean(Config.USE_LOCAL_TIME_STAMP, false);
+
+ String tzName = context.getString(Config.TIME_ZONE);
+ timeZone = (tzName == null) ? null : TimeZone.getTimeZone(tzName);
+ needRounding = context.getBoolean(Config.ROUND, false);
+
+ String unit = context.getString(Config.ROUND_UNIT, Config.MINUTE);
+ if (unit.equalsIgnoreCase(Config.HOUR)) {
+ this.roundUnit = Calendar.HOUR_OF_DAY;
+ } else if (unit.equalsIgnoreCase(Config.MINUTE)) {
+ this.roundUnit = Calendar.MINUTE;
+ } else if (unit.equalsIgnoreCase(Config.SECOND)){
+ this.roundUnit = Calendar.SECOND;
+ } else {
+ LOG.warn(getName() + ". Rounding unit is not valid, please set one of " +
+ "minute, hour or second. Rounding will be disabled");
+ needRounding = false;
+ }
+ this.roundValue = context.getInteger(Config.ROUND_VALUE, 1);
+ if (roundUnit == Calendar.SECOND || roundUnit == Calendar.MINUTE){
+ Preconditions.checkArgument(roundValue > 0 && roundValue <= 60,
+ "Round value must be > 0 and <= 60");
+ } else if (roundUnit == Calendar.HOUR_OF_DAY){
+ Preconditions.checkArgument(roundValue > 0 && roundValue <= 24,
+ "Round value must be > 0 and <= 24");
+ }
+
+ // Serializer
+ serializerType = context.getString(Config.SERIALIZER, "");
+ if (serializerType.isEmpty()) {
+ throw new IllegalArgumentException("serializer config setting is not " +
+ "specified for sink " + getName());
+ }
+
+ serializer = createSerializer(serializerType);
+ serializer.configure(context);
+
+ Preconditions.checkArgument(batchSize > 0, "batchSize must be greater than 0");
+
+ if (sinkCounter == null) {
+ sinkCounter = new SinkCounter(getName());
+ }
+ }
+
+ @VisibleForTesting
+ protected SinkCounter getCounter() {
+ return sinkCounter;
+ }
+ private HiveEventSerializer createSerializer(String serializerName) {
+ if(serializerName.compareToIgnoreCase(HiveDelimitedTextSerializer.ALIAS) == 0 ||
+ serializerName.compareTo(HiveDelimitedTextSerializer.class.getName()) == 0) {
+ return new HiveDelimitedTextSerializer();
+ } else if (serializerName.compareToIgnoreCase(HiveJsonSerializer.ALIAS) == 0 ||
+ serializerName.compareTo(HiveJsonSerializer.class.getName()) == 0) {
+ return new HiveJsonSerializer();
+ }
+
+ try {
+ return (HiveEventSerializer) Class.forName(serializerName).newInstance();
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Unable to instantiate serializer: " + serializerName
+ + " on sink: " + getName(), e);
+ }
+ }
+
+
+ /**
+ * Pull events out of channel, find corresponding HiveWriter and write to it.
+ * Take at most batchSize events per Transaction. <br/>
+ * This method is not thread safe.
+ */
+ public Status process() throws EventDeliveryException {
+ // writers used in this Txn
+
+ Channel channel = getChannel();
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+ boolean success = false;
+ try {
+ // 1 Enable Heart Beats
+ if (timeToSendHeartBeat.compareAndSet(true, false)) {
+ enableHeartBeatOnAllWriters();
+ }
+
+ // 2 Drain Batch
+ int txnEventCount = drainOneBatch(channel);
+ transaction.commit();
+ success = true;
+
+ // 3 Update Counters
+ if (txnEventCount < 1) {
+ return Status.BACKOFF;
+ } else {
+ return Status.READY;
+ }
+ } catch (InterruptedException err) {
+ LOG.warn(getName() + ": Thread was interrupted.", err);
+ return Status.BACKOFF;
+ } catch (Exception e) {
+ throw new EventDeliveryException(e);
+ } finally {
+ if (!success) {
+ transaction.rollback();
+ }
+ transaction.close();
+ }
+ }
+
+ // Drains one batch of events from Channel into Hive
+ private int drainOneBatch(Channel channel)
+ throws HiveWriter.Failure, InterruptedException {
+ int txnEventCount = 0;
+ try {
+ Map<HiveEndPoint,HiveWriter> activeWriters = Maps.newHashMap();
+ for (; txnEventCount < batchSize; ++txnEventCount) {
+ // 0) Read event from Channel
+ Event event = channel.take();
+ if (event == null) {
+ break;
+ }
+
+ //1) Create end point by substituting place holders
+ HiveEndPoint endPoint = makeEndPoint(metaStoreUri, database, table,
+ partitionVals, event.getHeaders(), timeZone,
+ needRounding, roundUnit, roundValue, useLocalTime);
+
+ //2) Create or reuse Writer
+ HiveWriter writer = getOrCreateWriter(activeWriters, endPoint);
+
+ //3) Write
+ LOG.debug("{} : Writing event to {}", getName(), endPoint);
+ writer.write(event);
+
+ } // for
+
+ //4) Update counters
+ if (txnEventCount == 0) {
+ sinkCounter.incrementBatchEmptyCount();
+ } else if (txnEventCount == batchSize) {
+ sinkCounter.incrementBatchCompleteCount();
+ } else {
+ sinkCounter.incrementBatchUnderflowCount();
+ }
+ sinkCounter.addToEventDrainAttemptCount(txnEventCount);
+
+
+ // 5) Flush all Writers
+ for (HiveWriter writer : activeWriters.values()) {
+ writer.flush(true);
+ }
+
+ sinkCounter.addToEventDrainSuccessCount(txnEventCount);
+ return txnEventCount;
+ } catch (HiveWriter.Failure e) {
+ LOG.warn(getName() + " : " + e.getMessage(), e);
+ abortAllWriters();
+ closeAllWriters();
+ throw e;
+ }
+ }
+
+ private void enableHeartBeatOnAllWriters() {
+ for (HiveWriter writer : allWriters.values()) {
+ writer.setHearbeatNeeded();
+ }
+ }
+
+ private HiveWriter getOrCreateWriter(Map<HiveEndPoint, HiveWriter> activeWriters,
+ HiveEndPoint endPoint)
+ throws HiveWriter.ConnectException, InterruptedException {
+ try {
+ HiveWriter writer = allWriters.get( endPoint );
+ if (writer == null) {
+ LOG.info(getName() + ": Creating Writer to Hive end point : " + endPoint);
+ writer = new HiveWriter(endPoint, txnsPerBatchAsk, autoCreatePartitions,
+ callTimeout, callTimeoutPool, proxyUser, serializer, sinkCounter);
+
+ sinkCounter.incrementConnectionCreatedCount();
+ if (allWriters.size() > maxOpenConnections){
+ int retired = closeIdleWriters();
+ if (retired == 0) {
+ closeEldestWriter();
+ }
+ }
+ allWriters.put(endPoint, writer);
+ activeWriters.put(endPoint, writer);
+ }
+ else {
+ if (activeWriters.get(endPoint) == null) {
+ activeWriters.put(endPoint,writer);
+ }
+ }
+ return writer;
+ } catch (HiveWriter.ConnectException e) {
+ sinkCounter.incrementConnectionFailedCount();
+ throw e;
+ }
+
+ }
+
+ private HiveEndPoint makeEndPoint(String metaStoreUri, String database, String table,
+ List<String> partVals, Map<String, String> headers,
+ TimeZone timeZone, boolean needRounding,
+ int roundUnit, Integer roundValue,
+ boolean useLocalTime) {
+ if (partVals == null) {
+ return new HiveEndPoint(metaStoreUri, database, table, null);
+ }
+
+ ArrayList<String> realPartVals = Lists.newArrayList();
+ for (String partVal : partVals) {
+ realPartVals.add(BucketPath.escapeString(partVal, headers, timeZone,
+ needRounding, roundUnit, roundValue, useLocalTime));
+ }
+ return new HiveEndPoint(metaStoreUri, database, table, realPartVals);
+ }
+
+ /**
+ * Locate writer that has not been used for longest time and retire it
+ */
+ private void closeEldestWriter() throws InterruptedException {
+ long oldestTimeStamp = System.currentTimeMillis();
+ HiveEndPoint eldest = null;
+ for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+ if (entry.getValue().getLastUsed() < oldestTimeStamp) {
+ eldest = entry.getKey();
+ oldestTimeStamp = entry.getValue().getLastUsed();
+ }
+ }
+
+ try {
+ sinkCounter.incrementConnectionCreatedCount();
+ LOG.info(getName() + ": Closing least used Writer to Hive EndPoint : " + eldest);
+ allWriters.remove(eldest).close();
+ } catch (InterruptedException e) {
+ LOG.warn(getName() + ": Interrupted when attempting to close writer for end point: "
+ + eldest, e);
+ throw e;
+ }
+ }
+
+ /**
+ * Locate all writers past idle timeout and retire them
+ * @return number of writers retired
+ */
+ private int closeIdleWriters() throws InterruptedException {
+ int count = 0;
+ long now = System.currentTimeMillis();
+ ArrayList<HiveEndPoint> retirees = Lists.newArrayList();
+
+ //1) Find retirement candidates
+ for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+ if (now - entry.getValue().getLastUsed() > idleTimeout) {
+ ++count;
+ retirees.add(entry.getKey());
+ }
+ }
+ //2) Retire them
+ for(HiveEndPoint ep : retirees) {
+ sinkCounter.incrementConnectionClosedCount();
+ LOG.info(getName() + ": Closing idle Writer to Hive end point : {}", ep);
+ allWriters.remove(ep).close();
+ }
+ return count;
+ }
+
+ /**
+ * Closes all writers and remove them from cache
+ * @return number of writers retired
+ */
+ private void closeAllWriters() throws InterruptedException {
+ //1) Retire writers
+ for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+ entry.getValue().close();
+ }
+
+ //2) Clear cache
+ allWriters.clear();
+ }
+
+ /**
+ * Abort current Txn on all writers
+ * @return number of writers retired
+ */
+ private void abortAllWriters() throws InterruptedException {
+ for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+ entry.getValue().abort();
+ }
+ }
+
+ @Override
+ public void stop() {
+ // do not constrain close() calls with a timeout
+ for (Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) {
+ try {
+ HiveWriter w = entry.getValue();
+ LOG.info("Closing connection to {}", w);
+ w.closeConnection();
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ // shut down all thread pools
+ callTimeoutPool.shutdown();
+ try {
+ while (callTimeoutPool.isTerminated() == false) {
+ callTimeoutPool.awaitTermination(
+ Math.max(DEFAULT_CALLTIMEOUT, callTimeout), TimeUnit.MILLISECONDS);
+ }
+ } catch (InterruptedException ex) {
+ LOG.warn(getName() + ":Shutdown interrupted on " + callTimeoutPool, ex);
+ }
+
+ callTimeoutPool = null;
+ allWriters.clear();
+ allWriters = null;
+ sinkCounter.stop();
+ super.stop();
+ LOG.info("Hive Sink {} stopped", getName() );
+ }
+
+ @Override
+ public void start() {
+ String timeoutName = "hive-" + getName() + "-call-runner-%d";
+ // call timeout pool needs only 1 thd as sink is effectively single threaded
+ callTimeoutPool = Executors.newFixedThreadPool(1,
+ new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
+
+ this.allWriters = Maps.newHashMap();
+ sinkCounter.start();
+ super.start();
+ setupHeartBeatTimer();
+ LOG.info(getName() + ": Hive Sink {} started", getName() );
+ }
+
+ private void setupHeartBeatTimer() {
+ if (heartBeatInterval > 0) {
+ heartBeatTimer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ timeToSendHeartBeat.set(true);
+ setupHeartBeatTimer();
+ }
+ }, heartBeatInterval * 1000);
+ }
+ }
+
+
+ @Override
+ public String toString() {
+ return "{ Sink type:" + getClass().getSimpleName() + ", name:" + getName() +
+ " }";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java
new file mode 100644
index 0000000..4a06feb
--- /dev/null
+++ b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java
@@ -0,0 +1,426 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hive;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hive.hcatalog.streaming.*;
+
+import org.apache.flume.Event;
+
+import org.apache.flume.instrumentation.SinkCounter;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Internal API intended for HiveSink use.
+ */
+class HiveWriter {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(HiveWriter.class);
+
+ private final HiveEndPoint endPoint;
+ private HiveEventSerializer serializer;
+ private final StreamingConnection connection;
+ private final int txnsPerBatch;
+ private final RecordWriter recordWriter;
+ private TransactionBatch txnBatch;
+
+ private final ExecutorService callTimeoutPool;
+
+ private final long callTimeout;
+
+ private long lastUsed; // time of last flush on this writer
+
+ private SinkCounter sinkCounter;
+ private int batchCounter;
+ private long eventCounter;
+ private long processSize;
+
+ protected boolean closed; // flag indicating HiveWriter was closed
+ private boolean autoCreatePartitions;
+
+ private boolean hearbeatNeeded = false;
+
+ HiveWriter(HiveEndPoint endPoint, int txnsPerBatch,
+ boolean autoCreatePartitions, long callTimeout,
+ ExecutorService callTimeoutPool, String hiveUser,
+ HiveEventSerializer serializer, SinkCounter sinkCounter)
+ throws ConnectException, InterruptedException {
+ try {
+ this.autoCreatePartitions = autoCreatePartitions;
+ this.sinkCounter = sinkCounter;
+ this.callTimeout = callTimeout;
+ this.callTimeoutPool = callTimeoutPool;
+ this.endPoint = endPoint;
+ this.connection = newConnection(hiveUser);
+ this.txnsPerBatch = txnsPerBatch;
+ this.serializer = serializer;
+ this.recordWriter = serializer.createRecordWriter(endPoint);
+ this.txnBatch = nextTxnBatch(recordWriter);
+ this.closed = false;
+ this.lastUsed = System.currentTimeMillis();
+ } catch (InterruptedException e) {
+ throw e;
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new ConnectException(endPoint, e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return endPoint.toString();
+ }
+
+ /**
+ * Clear the class counters
+ */
+ private void resetCounters() {
+ eventCounter = 0;
+ processSize = 0;
+ batchCounter = 0;
+ }
+
+ void setHearbeatNeeded() {
+ hearbeatNeeded = true;
+ }
+
+
+ /**
+ * Write data, update stats
+ * @param event
+ * @throws StreamingException
+ * @throws InterruptedException
+ */
+ public synchronized void write(final Event event)
+ throws WriteException, InterruptedException {
+ if (closed) {
+ throw new IllegalStateException("Writer closed. Cannot write to : " + endPoint);
+ }
+
+ // write the event
+ try {
+ timedCall(new CallRunner1<Void>() {
+ @Override
+ public Void call() throws InterruptedException, StreamingException {
+ try {
+ serializer.write(txnBatch, event);
+ return null;
+ } catch (IOException e) {
+ throw new StreamingIOFailure(e.getMessage(), e);
+ }
+ }
+ });
+ } catch (StreamingException e) {
+ throw new WriteException(endPoint, txnBatch.getCurrentTxnId(), e);
+ } catch (TimeoutException e) {
+ throw new WriteException(endPoint, txnBatch.getCurrentTxnId(), e);
+ }
+
+ // Update Statistics
+ processSize += event.getBody().length;
+ eventCounter++;
+ }
+
+ /**
+ * Commits the current Txn.
+ * If 'rollToNext' is true, will switch to next Txn in batch or to a
+ * new TxnBatch if current Txn batch is exhausted
+ */
+ public void flush(boolean rollToNext)
+ throws CommitException, TxnBatchException, TxnFailure, InterruptedException {
+ //0 Heart beat on TxnBatch
+ if(hearbeatNeeded) {
+ hearbeatNeeded = false;
+ heartBeat();
+ }
+ lastUsed = System.currentTimeMillis();
+
+ try {
+ //1 commit txn & close batch if needed
+ commitTxn();
+ if(txnBatch.remainingTransactions() == 0) {
+ closeTxnBatch();
+ txnBatch = null;
+ if(rollToNext) {
+ txnBatch = nextTxnBatch(recordWriter);
+ }
+ }
+
+ //2 roll to next Txn
+ if(rollToNext) {
+ LOG.debug("Switching to next Txn for {}", endPoint);
+ txnBatch.beginNextTransaction(); // does not block
+ }
+ } catch (StreamingException e) {
+ throw new TxnFailure(txnBatch, e);
+ }
+ }
+
+ /**
+ * Aborts the current Txn and switches to next Txn.
+ * @throws StreamingException if could not get new Transaction Batch, or switch to next Txn
+ */
+ public void abort() throws InterruptedException {
+ abortTxn();
+ }
+
+ /** Queues up a heartbeat request on the current and remaining txns using the
+ * heartbeatThdPool and returns immediately
+ */
+ public void heartBeat() throws InterruptedException {
+ // 1) schedule the heartbeat on one thread in pool
+ try {
+ timedCall(new CallRunner1<Void>() {
+ @Override
+ public Void call() throws StreamingException {
+ LOG.info("Sending heartbeat on batch " + txnBatch);
+ txnBatch.heartbeat();
+ return null;
+ }
+ });
+ } catch (InterruptedException e) {
+ throw e;
+ } catch (Exception e) {
+ LOG.warn("Unable to send heartbeat on Txn Batch " + txnBatch, e);
+ // Suppressing exceptions as we don't care for errors on heartbeats
+ }
+ }
+
+ /**
+ * Close the Transaction Batch and connection
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public void close() throws InterruptedException {
+ closeTxnBatch();
+ closeConnection();
+ closed = true;
+ }
+
+ public void closeConnection() throws InterruptedException {
+ LOG.info("Closing connection to EndPoint : {}", endPoint);
+ try {
+ timedCall(new CallRunner1<Void>() {
+ @Override
+ public Void call() {
+ connection.close(); // could block
+ return null;
+ }
+ });
+ sinkCounter.incrementConnectionClosedCount();
+ } catch (Exception e) {
+ LOG.warn("Error closing connection to EndPoint : " + endPoint, e);
+ // Suppressing exceptions as we don't care for errors on connection close
+ }
+ }
+
+ private void commitTxn() throws CommitException, InterruptedException {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Committing Txn " + txnBatch.getCurrentTxnId() + " on EndPoint: " + endPoint);
+ }
+ try {
+ timedCall(new CallRunner1<Void>() {
+ @Override
+ public Void call() throws StreamingException, InterruptedException {
+ txnBatch.commit(); // could block
+ return null;
+ }
+ });
+ } catch (Exception e) {
+ throw new CommitException(endPoint, txnBatch.getCurrentTxnId(), e);
+ }
+ }
+
+ private void abortTxn() throws InterruptedException {
+ LOG.info("Aborting Txn id {} on End Point {}", txnBatch.getCurrentTxnId(), endPoint);
+ try {
+ timedCall(new CallRunner1<Void>() {
+ @Override
+ public Void call() throws StreamingException, InterruptedException {
+ txnBatch.abort(); // could block
+ return null;
+ }
+ });
+ } catch (InterruptedException e) {
+ throw e;
+ } catch (TimeoutException e) {
+ LOG.warn("Timeout while aborting Txn " + txnBatch.getCurrentTxnId() + " on EndPoint: " + endPoint, e);
+ } catch (Exception e) {
+ LOG.warn("Error aborting Txn " + txnBatch.getCurrentTxnId() + " on EndPoint: " + endPoint, e);
+ // Suppressing exceptions as we don't care for errors on abort
+ }
+ }
+
+ private StreamingConnection newConnection(final String proxyUser)
+ throws InterruptedException, ConnectException {
+ try {
+ return timedCall(new CallRunner1<StreamingConnection>() {
+ @Override
+ public StreamingConnection call() throws InterruptedException, StreamingException {
+ return endPoint.newConnection(autoCreatePartitions); // could block
+ }
+ });
+ } catch (Exception e) {
+ throw new ConnectException(endPoint, e);
+ }
+ }
+
+ private TransactionBatch nextTxnBatch(final RecordWriter recordWriter)
+ throws InterruptedException, TxnBatchException {
+ LOG.debug("Fetching new Txn Batch for {}", endPoint);
+ TransactionBatch batch = null;
+ try {
+ batch = timedCall(new CallRunner1<TransactionBatch>() {
+ @Override
+ public TransactionBatch call() throws InterruptedException, StreamingException {
+ return connection.fetchTransactionBatch(txnsPerBatch, recordWriter); // could block
+ }
+ });
+ LOG.info("Acquired Txn Batch {}. Switching to first txn", batch);
+ batch.beginNextTransaction();
+ } catch (Exception e) {
+ throw new TxnBatchException(endPoint, e);
+ }
+ return batch;
+ }
+
+ private void closeTxnBatch() throws InterruptedException {
+ try {
+ LOG.debug("Closing Txn Batch {}", txnBatch);
+ timedCall(new CallRunner1<Void>() {
+ @Override
+ public Void call() throws InterruptedException, StreamingException {
+ txnBatch.close(); // could block
+ return null;
+ }
+ });
+ } catch (InterruptedException e) {
+ throw e;
+ } catch (Exception e) {
+ LOG.warn("Error closing Txn Batch " + txnBatch, e);
+ // Suppressing exceptions as we don't care for errors on batch close
+ }
+ }
+
+ private <T> T timedCall(final CallRunner1<T> callRunner)
+ throws TimeoutException, InterruptedException, StreamingException {
+ Future<T> future = callTimeoutPool.submit(new Callable<T>() {
+ @Override
+ public T call() throws StreamingException, InterruptedException {
+ return callRunner.call();
+ }
+ });
+
+ try {
+ if (callTimeout > 0) {
+ return future.get(callTimeout, TimeUnit.MILLISECONDS);
+ } else {
+ return future.get();
+ }
+ } catch (TimeoutException eT) {
+ future.cancel(true);
+ sinkCounter.incrementConnectionFailedCount();
+ throw eT;
+ } catch (ExecutionException e1) {
+ sinkCounter.incrementConnectionFailedCount();
+ Throwable cause = e1.getCause();
+ if (cause instanceof IOException ) {
+ throw new StreamingIOFailure("I/O Failure", (IOException) cause);
+ } else if (cause instanceof StreamingException) {
+ throw (StreamingException) cause;
+ } else if (cause instanceof TimeoutException) {
+ throw new StreamingException("Operation Timed Out.", (TimeoutException) cause);
+ } else if (cause instanceof RuntimeException) {
+ throw (RuntimeException) cause;
+ } else if (cause instanceof InterruptedException) {
+ throw (InterruptedException) cause;
+ }
+ throw new StreamingException(e1.getMessage(), e1);
+ }
+ }
+
+ long getLastUsed() {
+ return lastUsed;
+ }
+
+ /**
+ * Simple interface whose <tt>call</tt> method is called by
+ * {#callWithTimeout} in a new thread inside a
+ * {@linkplain java.security.PrivilegedExceptionAction#run()} call.
+ * @param <T>
+ */
+ private interface CallRunner<T> {
+ T call() throws Exception;
+ }
+
+
+ private interface CallRunner1<T> {
+ T call() throws StreamingException, InterruptedException;
+ }
+
+
+ public static class Failure extends Exception {
+ public Failure(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+ }
+
+ public static class WriteException extends Failure {
+ public WriteException(HiveEndPoint endPoint, Long currentTxnId, Throwable cause) {
+ super("Failed writing to : " + endPoint + ". TxnID : " + currentTxnId, cause);
+ }
+ }
+
+ public static class CommitException extends Failure {
+ public CommitException(HiveEndPoint endPoint, Long txnID, Throwable cause) {
+ super("Commit of Txn " + txnID + " failed on EndPoint: " + endPoint, cause);
+ }
+ }
+
+ public static class ConnectException extends Failure {
+ public ConnectException(HiveEndPoint ep, Throwable cause) {
+ super("Failed connecting to EndPoint " + ep, cause);
+ }
+ }
+
+ public static class TxnBatchException extends Failure {
+ public TxnBatchException(HiveEndPoint ep, Throwable cause) {
+ super("Failed acquiring Transaction Batch from EndPoint: " + ep, cause);
+ }
+ }
+
+ private class TxnFailure extends Failure {
+ public TxnFailure(TransactionBatch txnBatch, Throwable cause) {
+ super("Failed switching to next Txn in TxnBatch " + txnBatch, cause);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java
new file mode 100644
index 0000000..46724f2
--- /dev/null
+++ b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.flume.sink.hive;
+
+import com.google.common.collect.Lists;
+import junit.framework.Assert;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.SimpleEvent;
+import org.apache.flume.instrumentation.SinkCounter;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.UUID;
+
+public class TestHiveSink {
+ // 1) partitioned table
+ final static String dbName = "testing";
+ final static String tblName = "alerts";
+
+ public static final String PART1_NAME = "continent";
+ public static final String PART2_NAME = "country";
+ public static final String[] partNames = { PART1_NAME, PART2_NAME };
+
+ private static final String COL1 = "id";
+ private static final String COL2 = "msg";
+ final String[] colNames = {COL1,COL2};
+ private String[] colTypes = { "int", "string" };
+
+ private static final String PART1_VALUE = "Asia";
+ private static final String PART2_VALUE = "India";
+ private final ArrayList<String> partitionVals;
+
+ // 2) un-partitioned table
+ final static String dbName2 = "testing2";
+ final static String tblName2 = "alerts2";
+ final String[] colNames2 = {COL1,COL2};
+ private String[] colTypes2 = { "int", "string" };
+
+ HiveSink sink = new HiveSink();
+
+ private final HiveConf conf;
+
+ private final Driver driver;
+
+ final String metaStoreURI;
+
+ @Rule
+ public TemporaryFolder dbFolder = new TemporaryFolder();
+
+
+ private static final Logger LOG = LoggerFactory.getLogger(HiveSink.class);
+
+ public TestHiveSink() throws Exception {
+ partitionVals = new ArrayList<String>(2);
+ partitionVals.add(PART1_VALUE);
+ partitionVals.add(PART2_VALUE);
+
+ metaStoreURI = "null";
+
+ conf = new HiveConf(this.getClass());
+ TestUtil.setConfValues(conf);
+
+ // 1) prepare hive
+ TxnDbUtil.cleanDb();
+ TxnDbUtil.prepDb();
+
+ // 2) Setup Hive client
+ SessionState.start(new CliSessionState(conf));
+ driver = new Driver(conf);
+
+ }
+
+
+ @Before
+ public void setUp() throws Exception {
+ TestUtil.dropDB(conf, dbName);
+
+ sink = new HiveSink();
+ sink.setName("HiveSink-" + UUID.randomUUID().toString());
+
+ String dbLocation = dbFolder.newFolder(dbName).getCanonicalPath() + ".db";
+ dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths
+ TestUtil.createDbAndTable(driver, dbName, tblName, partitionVals, colNames,
+ colTypes, partNames, dbLocation);
+ }
+
+ @After
+ public void tearDown() throws MetaException, HiveException {
+ TestUtil.dropDB(conf, dbName);
+ }
+
+
+ @Test
+ public void testSingleWriterSimplePartitionedTable()
+ throws EventDeliveryException, IOException, CommandNeedRetryException {
+ int totalRecords = 4;
+ int batchSize = 2;
+ int batchCount = totalRecords / batchSize;
+
+ Context context = new Context();
+ context.put("hive.metastore", metaStoreURI);
+ context.put("hive.database",dbName);
+ context.put("hive.table",tblName);
+ context.put("hive.partition", PART1_VALUE + "," + PART2_VALUE);
+ context.put("autoCreatePartitions","false");
+ context.put("batchSize","" + batchSize);
+ context.put("serializer", HiveDelimitedTextSerializer.ALIAS);
+ context.put("serializer.fieldnames", COL1 + ",," + COL2 + ",");
+ context.put("heartBeatInterval", "0");
+
+ Channel channel = startSink(sink, context);
+
+ List<String> bodies = Lists.newArrayList();
+
+ // push the events in two batches
+ Transaction txn = channel.getTransaction();
+ txn.begin();
+ for (int j = 1; j <= totalRecords; j++) {
+ Event event = new SimpleEvent();
+ String body = j + ",blah,This is a log message,other stuff";
+ event.setBody(body.getBytes());
+ bodies.add(body);
+ channel.put(event);
+ }
+ // execute sink to process the events
+ txn.commit();
+ txn.close();
+
+
+ checkRecordCountInTable(0, dbName, tblName);
+ for (int i = 0; i < batchCount ; i++) {
+ sink.process();
+ }
+ sink.stop();
+ checkRecordCountInTable(totalRecords, dbName, tblName);
+ }
+
+ @Test
+ public void testSingleWriterSimpleUnPartitionedTable()
+ throws Exception {
+ TestUtil.dropDB(conf, dbName2);
+ String dbLocation = dbFolder.newFolder(dbName2).getCanonicalPath() + ".db";
+ dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths
+ TestUtil.createDbAndTable(driver, dbName2, tblName2, null, colNames2, colTypes2
+ , null, dbLocation);
+
+ try {
+ int totalRecords = 4;
+ int batchSize = 2;
+ int batchCount = totalRecords / batchSize;
+
+ Context context = new Context();
+ context.put("hive.metastore", metaStoreURI);
+ context.put("hive.database", dbName2);
+ context.put("hive.table", tblName2);
+ context.put("autoCreatePartitions","false");
+ context.put("batchSize","" + batchSize);
+ context.put("serializer", HiveDelimitedTextSerializer.ALIAS);
+ context.put("serializer.fieldnames", COL1 + ",," + COL2 + ",");
+ context.put("heartBeatInterval", "0");
+
+ Channel channel = startSink(sink, context);
+
+ List<String> bodies = Lists.newArrayList();
+
+ // Push the events in two batches
+ Transaction txn = channel.getTransaction();
+ txn.begin();
+ for (int j = 1; j <= totalRecords; j++) {
+ Event event = new SimpleEvent();
+ String body = j + ",blah,This is a log message,other stuff";
+ event.setBody(body.getBytes());
+ bodies.add(body);
+ channel.put(event);
+ }
+
+ txn.commit();
+ txn.close();
+
+ checkRecordCountInTable(0, dbName2, tblName2);
+ for (int i = 0; i < batchCount ; i++) {
+ sink.process();
+ }
+
+ // check before & after stopping sink
+ checkRecordCountInTable(totalRecords, dbName2, tblName2);
+ sink.stop();
+ checkRecordCountInTable(totalRecords, dbName2, tblName2);
+ } finally {
+ TestUtil.dropDB(conf, dbName2);
+ }
+ }
+
+ @Test
+ public void testSingleWriterUseHeaders()
+ throws Exception {
+ String[] colNames = {COL1, COL2};
+ String PART1_NAME = "country";
+ String PART2_NAME = "hour";
+ String[] partNames = {PART1_NAME, PART2_NAME};
+ List<String> partitionVals = null;
+ String PART1_VALUE = "%{" + PART1_NAME + "}";
+ String PART2_VALUE = "%y-%m-%d-%k";
+ partitionVals = new ArrayList<String>(2);
+ partitionVals.add(PART1_VALUE);
+ partitionVals.add(PART2_VALUE);
+
+ String tblName = "hourlydata";
+ TestUtil.dropDB(conf, dbName2);
+ String dbLocation = dbFolder.newFolder(dbName2).getCanonicalPath() + ".db";
+ dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths
+ TestUtil.createDbAndTable(driver, dbName2, tblName, partitionVals, colNames,
+ colTypes, partNames, dbLocation);
+
+ int totalRecords = 4;
+ int batchSize = 2;
+ int batchCount = totalRecords / batchSize;
+
+ Context context = new Context();
+ context.put("hive.metastore",metaStoreURI);
+ context.put("hive.database",dbName2);
+ context.put("hive.table",tblName);
+ context.put("hive.partition", PART1_VALUE + "," + PART2_VALUE);
+ context.put("autoCreatePartitions","true");
+ context.put("useLocalTimeStamp", "false");
+ context.put("batchSize","" + batchSize);
+ context.put("serializer", HiveDelimitedTextSerializer.ALIAS);
+ context.put("serializer.fieldnames", COL1 + ",," + COL2 + ",");
+ context.put("heartBeatInterval", "0");
+
+ Channel channel = startSink(sink, context);
+
+ Calendar eventDate = Calendar.getInstance();
+ List<String> bodies = Lists.newArrayList();
+
+ // push events in two batches - two per batch. each batch is diff hour
+ Transaction txn = channel.getTransaction();
+ txn.begin();
+ for (int j = 1; j <= totalRecords; j++) {
+ Event event = new SimpleEvent();
+ String body = j + ",blah,This is a log message,other stuff";
+ event.setBody(body.getBytes());
+ eventDate.clear();
+ eventDate.set(2014, 03, 03, j%batchCount, 1); // yy mm dd hh mm
+ event.getHeaders().put( "timestamp",
+ String.valueOf(eventDate.getTimeInMillis()) );
+ event.getHeaders().put( PART1_NAME, "Asia" );
+ bodies.add(body);
+ channel.put(event);
+ }
+ // execute sink to process the events
+ txn.commit();
+ txn.close();
+
+ checkRecordCountInTable(0, dbName2, tblName);
+ for (int i = 0; i < batchCount ; i++) {
+ sink.process();
+ }
+ checkRecordCountInTable(totalRecords, dbName2, tblName);
+ sink.stop();
+
+ // verify counters
+ SinkCounter counter = sink.getCounter();
+ Assert.assertEquals(2, counter.getConnectionCreatedCount());
+ Assert.assertEquals(2, counter.getConnectionClosedCount());
+ Assert.assertEquals(2, counter.getBatchCompleteCount());
+ Assert.assertEquals(0, counter.getBatchEmptyCount());
+ Assert.assertEquals(0, counter.getConnectionFailedCount() );
+ Assert.assertEquals(4, counter.getEventDrainAttemptCount());
+ Assert.assertEquals(4, counter.getEventDrainSuccessCount() );
+
+ }
+
+ @Test
+ public void testHeartBeat()
+ throws EventDeliveryException, IOException, CommandNeedRetryException {
+ int batchSize = 2;
+ int batchCount = 3;
+ int totalRecords = batchCount*batchSize;
+ Context context = new Context();
+ context.put("hive.metastore", metaStoreURI);
+ context.put("hive.database", dbName);
+ context.put("hive.table", tblName);
+ context.put("hive.partition", PART1_VALUE + "," + PART2_VALUE);
+ context.put("autoCreatePartitions","true");
+ context.put("batchSize","" + batchSize);
+ context.put("serializer", HiveDelimitedTextSerializer.ALIAS);
+ context.put("serializer.fieldnames", COL1 + ",," + COL2 + ",");
+ context.put("hive.txnsPerBatchAsk", "20");
+ context.put("heartBeatInterval", "3"); // heartbeat in seconds
+
+ Channel channel = startSink(sink, context);
+
+ List<String> bodies = Lists.newArrayList();
+
+ // push the events in two batches
+ for (int i = 0; i < batchCount; i++) {
+ Transaction txn = channel.getTransaction();
+ txn.begin();
+ for (int j = 1; j <= batchSize; j++) {
+ Event event = new SimpleEvent();
+ String body = i*j + ",blah,This is a log message,other stuff";
+ event.setBody(body.getBytes());
+ bodies.add(body);
+ channel.put(event);
+ }
+ // execute sink to process the events
+ txn.commit();
+ txn.close();
+
+ sink.process();
+ sleep(3000); // allow heartbeat to happen
+ }
+
+ sink.stop();
+ checkRecordCountInTable(totalRecords, dbName, tblName);
+ }
+
+ @Test
+ public void testJsonSerializer() throws Exception {
+ int batchSize = 2;
+ int batchCount = 2;
+ int totalRecords = batchCount*batchSize;
+ Context context = new Context();
+ context.put("hive.metastore",metaStoreURI);
+ context.put("hive.database",dbName);
+ context.put("hive.table",tblName);
+ context.put("hive.partition", PART1_VALUE + "," + PART2_VALUE);
+ context.put("autoCreatePartitions","true");
+ context.put("batchSize","" + batchSize);
+ context.put("serializer", HiveJsonSerializer.ALIAS);
+ context.put("serializer.fieldnames", COL1 + ",," + COL2 + ",");
+ context.put("heartBeatInterval", "0");
+
+ Channel channel = startSink(sink, context);
+
+ List<String> bodies = Lists.newArrayList();
+
+ // push the events in two batches
+ for (int i = 0; i < batchCount; i++) {
+ Transaction txn = channel.getTransaction();
+ txn.begin();
+ for (int j = 1; j <= batchSize; j++) {
+ Event event = new SimpleEvent();
+ String body = "{\"id\" : 1, \"msg\" : \"using json serializer\"}";
+ event.setBody(body.getBytes());
+ bodies.add(body);
+ channel.put(event);
+ }
+ // execute sink to process the events
+ txn.commit();
+ txn.close();
+
+ sink.process();
+ }
+ checkRecordCountInTable(totalRecords, dbName, tblName);
+ sink.stop();
+ checkRecordCountInTable(totalRecords, dbName, tblName);
+ }
+
+ private void sleep(int n) {
+ try {
+ Thread.sleep(n);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ private static Channel startSink(HiveSink sink, Context context) {
+ Configurables.configure(sink, context);
+
+ Channel channel = new MemoryChannel();
+ Configurables.configure(channel, context);
+ sink.setChannel(channel);
+ sink.start();
+ return channel;
+ }
+
+ private void checkRecordCountInTable(int expectedCount, String db, String tbl)
+ throws CommandNeedRetryException, IOException {
+ int count = TestUtil.listRecordsInTable(driver, db, tbl).size();
+ Assert.assertEquals(expectedCount, count);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java
new file mode 100644
index 0000000..174f179
--- /dev/null
+++ b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.sink.hive;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import junit.framework.Assert;
+import org.apache.flume.Context;
+import org.apache.flume.event.SimpleEvent;
+import org.apache.flume.instrumentation.SinkCounter;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class TestHiveWriter {
+ final static String dbName = "testing";
+ final static String tblName = "alerts";
+
+ public static final String PART1_NAME = "continent";
+ public static final String PART2_NAME = "country";
+ public static final String[] partNames = { PART1_NAME, PART2_NAME };
+
+ private static final String COL1 = "id";
+ private static final String COL2 = "msg";
+ final String[] colNames = {COL1,COL2};
+ private String[] colTypes = { "int", "string" };
+
+ private static final String PART1_VALUE = "Asia";
+ private static final String PART2_VALUE = "India";
+ private final ArrayList<String> partVals;
+
+ private final String metaStoreURI;
+
+ private HiveDelimitedTextSerializer serializer;
+
+ private final HiveConf conf;
+
+ private ExecutorService callTimeoutPool;
+ int timeout = 10000; // msec
+
+ @Rule
+ public TemporaryFolder dbFolder = new TemporaryFolder();
+
+ private final Driver driver;
+
+ public TestHiveWriter() throws Exception {
+ partVals = new ArrayList<String>(2);
+ partVals.add(PART1_VALUE);
+ partVals.add(PART2_VALUE);
+
+ metaStoreURI = null;
+
+ int callTimeoutPoolSize = 1;
+ callTimeoutPool = Executors.newFixedThreadPool(callTimeoutPoolSize,
+ new ThreadFactoryBuilder().setNameFormat("hiveWriterTest").build());
+
+ // 1) Start metastore
+ conf = new HiveConf(this.getClass());
+ TestUtil.setConfValues(conf);
+ if (metaStoreURI != null) {
+ conf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreURI);
+ }
+
+ // 2) Setup Hive client
+ SessionState.start(new CliSessionState(conf));
+ driver = new Driver(conf);
+
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ // 1) prepare hive
+ TxnDbUtil.cleanDb();
+ TxnDbUtil.prepDb();
+
+ // 1) Setup tables
+ TestUtil.dropDB(conf, dbName);
+ String dbLocation = dbFolder.newFolder(dbName).getCanonicalPath() + ".db";
+ dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths
+ TestUtil.createDbAndTable(driver, dbName, tblName, partVals, colNames, colTypes
+ , partNames, dbLocation);
+
+ // 2) Setup serializer
+ Context ctx = new Context();
+ ctx.put("serializer.fieldnames", COL1 + ",," + COL2 + ",");
+ serializer = new HiveDelimitedTextSerializer();
+ serializer.configure(ctx);
+ }
+
+ @Test
+ public void testInstantiate() throws Exception {
+ HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals);
+ SinkCounter sinkCounter = new SinkCounter(this.getClass().getName());
+ HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout
+ , callTimeoutPool, "flumetest", serializer, sinkCounter);
+
+ writer.close();
+ }
+
+ @Test
+ public void testWriteBasic() throws Exception {
+ HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals);
+ SinkCounter sinkCounter = new SinkCounter(this.getClass().getName());
+ HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout
+ , callTimeoutPool, "flumetest", serializer, sinkCounter);
+
+ writeEvents(writer,3);
+ writer.flush(false);
+ writer.close();
+ checkRecordCountInTable(3);
+ }
+
+ @Test
+ public void testWriteMultiFlush() throws Exception {
+ HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals);
+ SinkCounter sinkCounter = new SinkCounter(this.getClass().getName());
+
+ HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout
+ , callTimeoutPool, "flumetest", serializer, sinkCounter);
+
+ checkRecordCountInTable(0);
+ SimpleEvent event = new SimpleEvent();
+
+ String REC1 = "1,xyz,Hello world,abc";
+ event.setBody(REC1.getBytes());
+ writer.write(event);
+ checkRecordCountInTable(0);
+ writer.flush(true);
+ checkRecordCountInTable(1);
+
+ String REC2 = "2,xyz,Hello world,abc";
+ event.setBody(REC2.getBytes());
+ writer.write(event);
+ checkRecordCountInTable(1);
+ writer.flush(true);
+ checkRecordCountInTable(2);
+
+ String REC3 = "3,xyz,Hello world,abc";
+ event.setBody(REC3.getBytes());
+ writer.write(event);
+ writer.flush(true);
+ checkRecordCountInTable(3);
+ writer.close();
+
+ checkRecordCountInTable(3);
+ }
+
+ private void checkRecordCountInTable(int expectedCount)
+ throws CommandNeedRetryException, IOException {
+ int count = TestUtil.listRecordsInTable(driver, dbName, tblName).size();
+ Assert.assertEquals(expectedCount, count);
+ }
+
+ /**
+ * Sets up input fields to have same order as table columns,
+ * Also sets the separator on serde to be same as i/p field separator
+ * @throws Exception
+ */
+ @Test
+ public void testInOrderWrite() throws Exception {
+ HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals);
+ SinkCounter sinkCounter = new SinkCounter(this.getClass().getName());
+ int timeout = 5000; // msec
+
+ HiveDelimitedTextSerializer serializer2 = new HiveDelimitedTextSerializer();
+ Context ctx = new Context();
+ ctx.put("serializer.fieldnames", COL1 + "," + COL2);
+ ctx.put("serializer.serdeSeparator", ",");
+ serializer2.configure(ctx);
+
+
+ HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout, callTimeoutPool,
+ "flumetest", serializer2, sinkCounter);
+
+ SimpleEvent event = new SimpleEvent();
+ event.setBody("1,Hello world 1".getBytes());
+ writer.write(event);
+ event.setBody("2,Hello world 2".getBytes());
+ writer.write(event);
+ event.setBody("3,Hello world 3".getBytes());
+ writer.write(event);
+ writer.flush(false);
+ writer.close();
+ }
+
+ @Test
+ public void testSerdeSeparatorCharParsing() throws Exception {
+ HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals);
+ SinkCounter sinkCounter = new SinkCounter(this.getClass().getName());
+ int timeout = 10000; // msec
+
+ // 1) single character serdeSeparator
+ HiveDelimitedTextSerializer serializer1 = new HiveDelimitedTextSerializer();
+ Context ctx = new Context();
+ ctx.put("serializer.fieldnames", COL1 + "," + COL2);
+ ctx.put("serializer.serdeSeparator", ",");
+ serializer1.configure(ctx);
+ // show not throw
+
+
+ // 2) special character as serdeSeparator
+ HiveDelimitedTextSerializer serializer2 = new HiveDelimitedTextSerializer();
+ ctx = new Context();
+ ctx.put("serializer.fieldnames", COL1 + "," + COL2);
+ ctx.put("serializer.serdeSeparator", "'\t'");
+ serializer2.configure(ctx);
+ // show not throw
+
+
+ // 2) bad spec as serdeSeparator
+ HiveDelimitedTextSerializer serializer3 = new HiveDelimitedTextSerializer();
+ ctx = new Context();
+ ctx.put("serializer.fieldnames", COL1 + "," + COL2);
+ ctx.put("serializer.serdeSeparator", "ab");
+ try {
+ serializer3.configure(ctx);
+ Assert.assertTrue("Bad serdeSeparator character was accepted" ,false);
+ } catch (Exception e){
+ // expect an exception
+ }
+
+ }
+
+
+ @Test
+ public void testSecondWriterBeforeFirstCommits() throws Exception {
+ // here we open a new writer while the first is still writing (not committed)
+ HiveEndPoint endPoint1 = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals);
+ ArrayList<String> partVals2 = new ArrayList<String>(2);
+ partVals2.add(PART1_VALUE);
+ partVals2.add("Nepal");
+ HiveEndPoint endPoint2 = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals2);
+
+ SinkCounter sinkCounter1 = new SinkCounter(this.getClass().getName());
+ SinkCounter sinkCounter2 = new SinkCounter(this.getClass().getName());
+
+ HiveWriter writer1 = new HiveWriter(endPoint1, 10, true, timeout
+ , callTimeoutPool, "flumetest", serializer, sinkCounter1);
+
+ writeEvents(writer1, 3);
+
+ HiveWriter writer2 = new HiveWriter(endPoint2, 10, true, timeout
+ , callTimeoutPool, "flumetest", serializer, sinkCounter2);
+ writeEvents(writer2, 3);
+ writer2.flush(false); // commit
+
+ writer1.flush(false); // commit
+ writer1.close();
+
+ writer2.close();
+ }
+
+
+ @Test
+ public void testSecondWriterAfterFirstCommits() throws Exception {
+ // here we open a new writer after the first writer has committed one txn
+ HiveEndPoint endPoint1 = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals);
+ ArrayList<String> partVals2 = new ArrayList<String>(2);
+ partVals2.add(PART1_VALUE);
+ partVals2.add("Nepal");
+ HiveEndPoint endPoint2 = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals2);
+
+ SinkCounter sinkCounter1 = new SinkCounter(this.getClass().getName());
+ SinkCounter sinkCounter2 = new SinkCounter(this.getClass().getName());
+
+ HiveWriter writer1 = new HiveWriter(endPoint1, 10, true, timeout
+ , callTimeoutPool, "flumetest", serializer, sinkCounter1);
+
+ writeEvents(writer1, 3);
+
+ writer1.flush(false); // commit
+
+
+ HiveWriter writer2 = new HiveWriter(endPoint2, 10, true, timeout
+ , callTimeoutPool, "flumetest", serializer, sinkCounter2);
+ writeEvents(writer2, 3);
+ writer2.flush(false); // commit
+
+
+ writer1.close();
+ writer2.close();
+ }
+
+
+ private void writeEvents(HiveWriter writer, int count) throws InterruptedException, HiveWriter.WriteException {
+ SimpleEvent event = new SimpleEvent();
+ for (int i = 1; i <= count; i++) {
+ event.setBody((i + ",xyz,Hello world,abc").getBytes());
+ writer.write(event);
+ }
+ }
+}