You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2015/02/10 08:16:18 UTC

[2/4] flume git commit: FLUME-1734. Add a Hive Sink based on Hive Streaming support.

http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e50.dat
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e50.dat b/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e50.dat
new file mode 100644
index 0000000..fd69350
Binary files /dev/null and b/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e50.dat differ

http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e60.dat
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e60.dat b/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e60.dat
new file mode 100644
index 0000000..9ae5d87
Binary files /dev/null and b/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e60.dat differ

http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e71.dat
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e71.dat b/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e71.dat
new file mode 100644
index 0000000..762cc2a
Binary files /dev/null and b/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e71.dat differ

http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e80.dat
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e80.dat b/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e80.dat
new file mode 100644
index 0000000..fd69350
Binary files /dev/null and b/flume-ng-sinks/flume-hive-sink/metastore_db/seg0/d1e80.dat differ

http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/flume-ng-sinks/flume-hive-sink/metastore_db/service.properties
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/metastore_db/service.properties b/flume-ng-sinks/flume-hive-sink/metastore_db/service.properties
new file mode 100644
index 0000000..e385a7b
--- /dev/null
+++ b/flume-ng-sinks/flume-hive-sink/metastore_db/service.properties
@@ -0,0 +1,22 @@
+#/Users/hshreedharan/work/flume-latest/flume/flume-ng-sinks/flume-hive-sink/metastore_db
+# ********************************************************************
+# ***                Please do NOT edit this file.                 ***
+# *** CHANGING THE CONTENT OF THIS FILE MAY CAUSE DATA CORRUPTION. ***
+# ********************************************************************
+#Mon Feb 09 20:19:39 PST 2015
+SysschemasIndex2Identifier=225
+SyscolumnsIdentifier=144
+SysconglomeratesIndex1Identifier=49
+SysconglomeratesIdentifier=32
+SyscolumnsIndex2Identifier=177
+SysschemasIndex1Identifier=209
+SysconglomeratesIndex3Identifier=81
+SystablesIndex2Identifier=129
+SyscolumnsIndex1Identifier=161
+derby.serviceProtocol=org.apache.derby.database.Database
+SysschemasIdentifier=192
+derby.storage.propertiesId=16
+SysconglomeratesIndex2Identifier=65
+derby.serviceLocale=en_US
+SystablesIdentifier=96
+SystablesIndex1Identifier=113

http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/flume-ng-sinks/flume-hive-sink/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/pom.xml b/flume-ng-sinks/flume-hive-sink/pom.xml
new file mode 100644
index 0000000..e5f673a
--- /dev/null
+++ b/flume-ng-sinks/flume-hive-sink/pom.xml
@@ -0,0 +1,189 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.flume</groupId>
+    <artifactId>flume-ng-sinks</artifactId>
+    <version>1.6.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.flume.flume-ng-sinks</groupId>
+  <artifactId>flume-hive-sink</artifactId>
+  <name>Flume NG Hive Sink</name>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+  <profiles>
+    <profile>
+      <id>hadoop-1.0</id>
+      <activation>
+        <property>
+          <name>!hadoop.profile</name>
+        </property>
+      </activation>
+
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-core</artifactId>
+          <version>${hadoop.version}</version>
+          <scope>test</scope>
+        </dependency>
+      </dependencies>
+    </profile>
+    <profile>
+      <id>hadoop-2</id>
+      <activation>
+        <property>
+          <name>hadoop.profile</name>
+          <value>2</value>
+        </property>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
+          <version>${hadoop.version}</version>
+          <scope>test</scope>
+          <optional>true</optional>
+        </dependency>
+
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-mapreduce-client-core</artifactId>
+          <scope>test</scope>
+          <version>${hadoop.version}</version>
+        </dependency>
+      </dependencies>
+    </profile>
+
+    <profile>
+      <id>hbase-98</id>
+      <activation>
+        <property>
+          <name>hadoop.profile</name>
+          <value>hbase-98</value>
+        </property>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
+          <scope>test</scope>
+          <optional>true</optional>
+        </dependency>
+
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-mapreduce-client-core</artifactId>
+          <scope>test</scope>
+          <version>${hadoop.version}</version>
+        </dependency>
+      </dependencies>
+    </profile>
+  </profiles>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-sdk</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-configuration</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hive.hcatalog</groupId>
+      <artifactId>hive-hcatalog-streaming</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hive.hcatalog</groupId>
+      <artifactId>hive-hcatalog-core</artifactId>
+      <scope>provided</scope>
+      <version>${hive.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-cli</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <!--temporary - really belongs to hive-streaming : roshan -->
+    <dependency>
+      <groupId>xerces</groupId>
+      <artifactId>xercesImpl</artifactId>
+      <scope>runtime</scope>
+      <version>2.9.1</version>
+    </dependency>
+
+    <dependency>
+      <groupId>xalan</groupId>
+      <artifactId>serializer</artifactId>
+      <version>2.7.1</version>
+      <scope>runtime</scope>
+    </dependency>
+    <dependency>
+      <groupId>xalan</groupId>
+      <artifactId>xalan</artifactId>
+      <scope>runtime</scope>
+      <version>2.7.1</version>
+    </dependency>
+    <!-- end temporary -->
+
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/Config.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/Config.java b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/Config.java
new file mode 100644
index 0000000..b2d2582
--- /dev/null
+++ b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/Config.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hive;
+
+public class Config {
+  public static final String HIVE_METASTORE = "hive.metastore";
+  public static final String HIVE_DATABASE = "hive.database";
+  public static final String HIVE_TABLE = "hive.table";
+  public static final String HIVE_PARTITION = "hive.partition";
+  public static final String HIVE_TXNS_PER_BATCH_ASK = "hive.txnsPerBatchAsk";
+  public static final String BATCH_SIZE = "batchSize";
+  public static final String IDLE_TIMEOUT = "idleTimeout";
+  public static final String CALL_TIMEOUT = "callTimeout";
+  public static final String HEART_BEAT_INTERVAL = "heartBeatInterval";
+  public static final String MAX_OPEN_CONNECTIONS = "maxOpenConnections";
+  public static final String USE_LOCAL_TIME_STAMP = "useLocalTimeStamp";
+  public static final String TIME_ZONE = "timeZone";
+  public static final String ROUND_UNIT = "roundUnit";
+  public static final String ROUND = "round";
+  public static final String HOUR = "hour";
+  public static final String MINUTE = "minute";
+  public static final String SECOND = "second";
+  public static final String ROUND_VALUE = "roundValue";
+  public static final String SERIALIZER = "serializer";
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveDelimitedTextSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveDelimitedTextSerializer.java b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveDelimitedTextSerializer.java
new file mode 100644
index 0000000..8f64435
--- /dev/null
+++ b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveDelimitedTextSerializer.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hive;
+
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.hive.hcatalog.streaming.DelimitedInputWriter;
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
+import org.apache.hive.hcatalog.streaming.RecordWriter;
+import org.apache.hive.hcatalog.streaming.StreamingException;
+import org.apache.hive.hcatalog.streaming.TransactionBatch;
+
+import java.io.IOException;
+
+/** Forwards the incoming event body to Hive unmodified
+ * Sets up the delimiter and the field to column mapping
+ */
+public class HiveDelimitedTextSerializer implements HiveEventSerializer  {
+  public static final String ALIAS = "DELIMITED";
+
+  public static final String defaultDelimiter = ",";
+  public static final String SERIALIZER_DELIMITER = "serializer.delimiter";
+  public static final String SERIALIZER_FIELDNAMES = "serializer.fieldnames";
+  public static final String SERIALIZER_SERDE_SEPARATOR = "serializer.serdeSeparator";
+
+  private String delimiter;
+  private String[] fieldToColMapping = null;
+  private Character serdeSeparator = null;
+
+  @Override
+  public void write(TransactionBatch txnBatch, Event e)
+          throws StreamingException, IOException, InterruptedException {
+    txnBatch.write(e.getBody());
+  }
+
+  @Override
+  public RecordWriter createRecordWriter(HiveEndPoint endPoint)
+          throws StreamingException, IOException, ClassNotFoundException {
+    if (serdeSeparator == null) {
+      return new DelimitedInputWriter(fieldToColMapping, delimiter, endPoint);
+    }
+    return new DelimitedInputWriter(fieldToColMapping, delimiter, endPoint, null
+            , serdeSeparator);
+  }
+
+  @Override
+  public void configure(Context context) {
+    delimiter = parseDelimiterSpec(
+            context.getString(SERIALIZER_DELIMITER, defaultDelimiter) );
+    String fieldNames = context.getString(SERIALIZER_FIELDNAMES);
+    if (fieldNames == null) {
+      throw new IllegalArgumentException("serializer.fieldnames is not specified " +
+              "for serializer " + this.getClass().getName() );
+    }
+    String serdeSeparatorStr = context.getString(SERIALIZER_SERDE_SEPARATOR);
+    this.serdeSeparator = parseSerdeSeparatorSpec(serdeSeparatorStr);
+
+    // split, but preserve empty fields (-1)
+    fieldToColMapping = fieldNames.trim().split(",",-1);
+  }
+
+  // if delimiter is a double quoted like "\t", drop quotes
+  private static String parseDelimiterSpec(String delimiter) {
+    if (delimiter == null) {
+      return null;
+    }
+    if (delimiter.charAt(0) == '"'  &&
+         delimiter.charAt(delimiter.length()-1) == '"') {
+      return delimiter.substring(1,delimiter.length()-1);
+    }
+    return delimiter;
+  }
+
+  // if delimiter is a single quoted character like '\t', drop quotes
+  private static  Character parseSerdeSeparatorSpec(String separatorStr) {
+    if (separatorStr == null) {
+      return null;
+    }
+    if (separatorStr.length() == 1) {
+      return separatorStr.charAt(0);
+    }
+    if (separatorStr.length() == 3    &&
+          separatorStr.charAt(2) == '\''  &&
+          separatorStr.charAt(separatorStr.length()-1) == '\'') {
+      return  separatorStr.charAt(1);
+    }
+
+    throw new IllegalArgumentException("serializer.serdeSeparator spec is invalid " +
+            "for " + ALIAS + " serializer " );
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveEventSerializer.java b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveEventSerializer.java
new file mode 100644
index 0000000..c233d3d
--- /dev/null
+++ b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveEventSerializer.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hive;
+
+
+import org.apache.flume.Event;
+import org.apache.flume.conf.Configurable;
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
+import org.apache.hive.hcatalog.streaming.RecordWriter;
+import org.apache.hive.hcatalog.streaming.StreamingException;
+import org.apache.hive.hcatalog.streaming.TransactionBatch;
+
+import java.io.IOException;
+
+public interface HiveEventSerializer extends Configurable {
+  public void write(TransactionBatch batch, Event e)
+          throws StreamingException, IOException, InterruptedException;
+
+  RecordWriter createRecordWriter(HiveEndPoint endPoint)
+          throws StreamingException, IOException, ClassNotFoundException;
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveJsonSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveJsonSerializer.java b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveJsonSerializer.java
new file mode 100644
index 0000000..a75073f
--- /dev/null
+++ b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveJsonSerializer.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hive;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
+import org.apache.hive.hcatalog.streaming.RecordWriter;
+import org.apache.hive.hcatalog.streaming.StreamingException;
+import org.apache.hive.hcatalog.streaming.StrictJsonWriter;
+import org.apache.hive.hcatalog.streaming.TransactionBatch;
+
+import java.io.IOException;
+
+/** Forwards the incoming event body to Hive unmodified
+ * Sets up the delimiter and the field to column mapping
+ */
+
+public class HiveJsonSerializer implements HiveEventSerializer  {
+  public static final String ALIAS = "JSON";
+
+  @Override
+  public void write(TransactionBatch txnBatch, Event e)
+          throws StreamingException, IOException, InterruptedException {
+    txnBatch.write(e.getBody());
+  }
+
+  @Override
+  public RecordWriter createRecordWriter(HiveEndPoint endPoint)
+          throws StreamingException, IOException, ClassNotFoundException {
+    return new StrictJsonWriter(endPoint);
+  }
+
+  @Override
+  public void configure(Context context) {
+    return;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java
new file mode 100644
index 0000000..6fe332a
--- /dev/null
+++ b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java
@@ -0,0 +1,524 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hive;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.formatter.output.BucketPath;
+import org.apache.flume.instrumentation.SinkCounter;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.hive.hcatalog.streaming.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TimeZone;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class HiveSink extends AbstractSink implements Configurable {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(HiveSink.class);
+
+  private static final int DEFAULT_MAXOPENCONNECTIONS = 500;
+  private static final int DEFAULT_TXNSPERBATCH = 100;
+  private static final int DEFAULT_BATCHSIZE = 15000;
+  private static final int DEFAULT_CALLTIMEOUT = 10000;
+  private static final int DEFAULT_IDLETIMEOUT = 0;
+  private static final int DEFAULT_HEARTBEATINTERVAL = 240; // seconds
+
+
+  private Map<HiveEndPoint, HiveWriter> allWriters;
+
+  private SinkCounter sinkCounter;
+  private volatile int idleTimeout;
+  private String metaStoreUri;
+  private String proxyUser;
+  private String database;
+  private String table;
+  private List<String> partitionVals;
+  private Integer txnsPerBatchAsk;
+  private Integer batchSize;
+  private Integer maxOpenConnections;
+  private boolean autoCreatePartitions;
+  private String serializerType;
+  private HiveEventSerializer serializer;
+
+  /**
+   * Default timeout for blocking I/O calls in HiveWriter
+   */
+  private Integer callTimeout;
+  private Integer heartBeatInterval;
+
+  private ExecutorService callTimeoutPool;
+
+  private boolean useLocalTime;
+  private TimeZone timeZone;
+  private boolean needRounding;
+  private int roundUnit;
+  private Integer roundValue;
+
+  private Timer heartBeatTimer = new Timer();
+  private AtomicBoolean timeToSendHeartBeat = new AtomicBoolean(false);
+
+  @VisibleForTesting
+  Map<HiveEndPoint, HiveWriter> getAllWriters() {
+    return allWriters;
+  }
+
+  // read configuration and setup thresholds
+  @Override
+  public void configure(Context context) {
+
+    metaStoreUri = context.getString(Config.HIVE_METASTORE);
+    if (metaStoreUri == null) {
+      throw new IllegalArgumentException(Config.HIVE_METASTORE + " config setting is not " +
+              "specified for sink " + getName());
+    }
+    if (metaStoreUri.equalsIgnoreCase("null")) { // for testing support
+      metaStoreUri = null;
+    }
+    proxyUser = null; // context.getString("hive.proxyUser"); not supported by hive api yet
+    database = context.getString(Config.HIVE_DATABASE);
+    if (database == null) {
+      throw new IllegalArgumentException(Config.HIVE_DATABASE + " config setting is not " +
+            "specified for sink " + getName());
+    }
+    table = context.getString(Config.HIVE_TABLE);
+    if (table == null) {
+      throw new IllegalArgumentException(Config.HIVE_TABLE + " config setting is not " +
+              "specified for sink " + getName());
+    }
+
+    String partitions = context.getString(Config.HIVE_PARTITION);
+    if (partitions != null) {
+      partitionVals = Arrays.asList(partitions.split(","));
+    }
+
+
+    txnsPerBatchAsk = context.getInteger(Config.HIVE_TXNS_PER_BATCH_ASK, DEFAULT_TXNSPERBATCH);
+    if (txnsPerBatchAsk < 0) {
+      LOG.warn(getName() + ". hive.txnsPerBatchAsk must be  positive number. Defaulting to "
+              + DEFAULT_TXNSPERBATCH);
+      txnsPerBatchAsk = DEFAULT_TXNSPERBATCH;
+    }
+    batchSize = context.getInteger(Config.BATCH_SIZE, DEFAULT_BATCHSIZE);
+    if (batchSize < 0) {
+      LOG.warn(getName() + ". batchSize must be  positive number. Defaulting to "
+              + DEFAULT_BATCHSIZE);
+      batchSize = DEFAULT_BATCHSIZE;
+    }
+    idleTimeout = context.getInteger(Config.IDLE_TIMEOUT, DEFAULT_IDLETIMEOUT);
+    if (idleTimeout < 0) {
+      LOG.warn(getName() + ". idleTimeout must be  positive number. Defaulting to "
+              + DEFAULT_IDLETIMEOUT);
+      idleTimeout = DEFAULT_IDLETIMEOUT;
+    }
+    callTimeout = context.getInteger(Config.CALL_TIMEOUT, DEFAULT_CALLTIMEOUT);
+    if (callTimeout < 0) {
+      LOG.warn(getName() + ". callTimeout must be  positive number. Defaulting to "
+              + DEFAULT_CALLTIMEOUT);
+      callTimeout = DEFAULT_CALLTIMEOUT;
+    }
+
+    heartBeatInterval = context.getInteger(Config.HEART_BEAT_INTERVAL, DEFAULT_HEARTBEATINTERVAL);
+    if (heartBeatInterval < 0) {
+      LOG.warn(getName() + ". heartBeatInterval must be  positive number. Defaulting to "
+              + DEFAULT_HEARTBEATINTERVAL);
+      heartBeatInterval = DEFAULT_HEARTBEATINTERVAL;
+    }
+    maxOpenConnections = context.getInteger(Config.MAX_OPEN_CONNECTIONS, DEFAULT_MAXOPENCONNECTIONS);
+    autoCreatePartitions =  context.getBoolean("autoCreatePartitions", true);
+
+    // Timestamp processing
+    useLocalTime = context.getBoolean(Config.USE_LOCAL_TIME_STAMP, false);
+
+    String tzName = context.getString(Config.TIME_ZONE);
+    timeZone = (tzName == null) ? null : TimeZone.getTimeZone(tzName);
+    needRounding = context.getBoolean(Config.ROUND, false);
+
+    String unit = context.getString(Config.ROUND_UNIT, Config.MINUTE);
+    if (unit.equalsIgnoreCase(Config.HOUR)) {
+      this.roundUnit = Calendar.HOUR_OF_DAY;
+    } else if (unit.equalsIgnoreCase(Config.MINUTE)) {
+      this.roundUnit = Calendar.MINUTE;
+    } else if (unit.equalsIgnoreCase(Config.SECOND)){
+      this.roundUnit = Calendar.SECOND;
+    } else {
+      LOG.warn(getName() + ". Rounding unit is not valid, please set one of " +
+              "minute, hour or second. Rounding will be disabled");
+      needRounding = false;
+    }
+    this.roundValue = context.getInteger(Config.ROUND_VALUE, 1);
+    if (roundUnit == Calendar.SECOND || roundUnit == Calendar.MINUTE){
+      Preconditions.checkArgument(roundValue > 0 && roundValue <= 60,
+              "Round value must be > 0 and <= 60");
+    } else if (roundUnit == Calendar.HOUR_OF_DAY){
+      Preconditions.checkArgument(roundValue > 0 && roundValue <= 24,
+              "Round value must be > 0 and <= 24");
+    }
+
+    // Serializer
+    serializerType = context.getString(Config.SERIALIZER, "");
+    if (serializerType.isEmpty()) {
+      throw new IllegalArgumentException("serializer config setting is not " +
+              "specified for sink " + getName());
+    }
+
+    serializer = createSerializer(serializerType);
+    serializer.configure(context);
+
+    Preconditions.checkArgument(batchSize > 0, "batchSize must be greater than 0");
+
+    if (sinkCounter == null) {
+      sinkCounter = new SinkCounter(getName());
+    }
+  }
+
+  @VisibleForTesting
+  protected SinkCounter getCounter() {
+    return sinkCounter;
+  }
+  private HiveEventSerializer createSerializer(String serializerName)  {
+    if(serializerName.compareToIgnoreCase(HiveDelimitedTextSerializer.ALIAS) == 0 ||
+            serializerName.compareTo(HiveDelimitedTextSerializer.class.getName()) == 0) {
+      return new HiveDelimitedTextSerializer();
+    } else if (serializerName.compareToIgnoreCase(HiveJsonSerializer.ALIAS) == 0 ||
+            serializerName.compareTo(HiveJsonSerializer.class.getName()) == 0) {
+      return new HiveJsonSerializer();
+    }
+
+    try {
+      return (HiveEventSerializer) Class.forName(serializerName).newInstance();
+    } catch (Exception e) {
+      throw new IllegalArgumentException("Unable to instantiate serializer: " + serializerName
+              + " on sink: " + getName(), e);
+    }
+  }
+
+
+  /**
+   * Pull events out of channel, find corresponding HiveWriter and write to it.
+   * Take at most batchSize events per Transaction. <br/>
+   * This method is not thread safe.
+   */
+  public Status process() throws EventDeliveryException {
+    // writers used in this Txn
+
+    Channel channel = getChannel();
+    Transaction transaction = channel.getTransaction();
+    transaction.begin();
+    boolean success = false;
+    try {
+      // 1 Enable Heart Beats
+      if (timeToSendHeartBeat.compareAndSet(true, false)) {
+        enableHeartBeatOnAllWriters();
+      }
+
+      // 2 Drain Batch
+      int txnEventCount = drainOneBatch(channel);
+      transaction.commit();
+      success = true;
+
+      // 3 Update Counters
+      if (txnEventCount < 1) {
+        return Status.BACKOFF;
+      } else {
+        return Status.READY;
+      }
+    } catch (InterruptedException err) {
+      LOG.warn(getName() + ": Thread was interrupted.", err);
+      return Status.BACKOFF;
+    } catch (Exception e) {
+      throw new EventDeliveryException(e);
+    } finally {
+      if (!success) {
+        transaction.rollback();
+      }
+      transaction.close();
+    }
+  }
+
+  // Drains one batch of events from Channel into Hive
+  private int drainOneBatch(Channel channel)
+          throws HiveWriter.Failure, InterruptedException {
+    int txnEventCount = 0;
+    try {
+      Map<HiveEndPoint,HiveWriter> activeWriters = Maps.newHashMap();
+      for (; txnEventCount < batchSize; ++txnEventCount) {
+        // 0) Read event from Channel
+        Event event = channel.take();
+        if (event == null) {
+          break;
+        }
+
+        //1) Create end point by substituting place holders
+        HiveEndPoint endPoint = makeEndPoint(metaStoreUri, database, table,
+                partitionVals, event.getHeaders(), timeZone,
+                needRounding, roundUnit, roundValue, useLocalTime);
+
+        //2) Create or reuse Writer
+        HiveWriter writer = getOrCreateWriter(activeWriters, endPoint);
+
+        //3) Write
+        LOG.debug("{} : Writing event to {}", getName(), endPoint);
+        writer.write(event);
+
+      } // for
+
+      //4) Update counters
+      if (txnEventCount == 0) {
+        sinkCounter.incrementBatchEmptyCount();
+      } else if (txnEventCount == batchSize) {
+        sinkCounter.incrementBatchCompleteCount();
+      } else {
+        sinkCounter.incrementBatchUnderflowCount();
+      }
+      sinkCounter.addToEventDrainAttemptCount(txnEventCount);
+
+
+      // 5) Flush all Writers
+      for (HiveWriter writer : activeWriters.values()) {
+        writer.flush(true);
+      }
+
+      sinkCounter.addToEventDrainSuccessCount(txnEventCount);
+      return txnEventCount;
+    } catch (HiveWriter.Failure e) {
+      LOG.warn(getName() + " : " + e.getMessage(), e);
+      abortAllWriters();
+      closeAllWriters();
+      throw e;
+    }
+  }
+
+  private void enableHeartBeatOnAllWriters() {
+    for (HiveWriter writer : allWriters.values()) {
+      writer.setHearbeatNeeded();
+    }
+  }
+
+  private HiveWriter getOrCreateWriter(Map<HiveEndPoint, HiveWriter> activeWriters,
+                                       HiveEndPoint endPoint)
+          throws HiveWriter.ConnectException, InterruptedException {
+    try {
+      HiveWriter writer = allWriters.get( endPoint );
+      if (writer == null) {
+        LOG.info(getName() + ": Creating Writer to Hive end point : " + endPoint);
+        writer = new HiveWriter(endPoint, txnsPerBatchAsk, autoCreatePartitions,
+                callTimeout, callTimeoutPool, proxyUser, serializer, sinkCounter);
+
+        sinkCounter.incrementConnectionCreatedCount();
+        if (allWriters.size() > maxOpenConnections){
+          int retired = closeIdleWriters();
+          if (retired == 0) {
+            closeEldestWriter();
+          }
+        }
+        allWriters.put(endPoint, writer);
+        activeWriters.put(endPoint, writer);
+      }
+      else {
+        if (activeWriters.get(endPoint) == null)  {
+          activeWriters.put(endPoint,writer);
+        }
+      }
+      return writer;
+    } catch (HiveWriter.ConnectException e) {
+      sinkCounter.incrementConnectionFailedCount();
+      throw e;
+    }
+
+  }
+
+  private HiveEndPoint makeEndPoint(String metaStoreUri, String database, String table,
+                                    List<String> partVals, Map<String, String> headers,
+                                    TimeZone timeZone, boolean needRounding,
+                                    int roundUnit, Integer roundValue,
+                                    boolean useLocalTime)  {
+    if (partVals == null) {
+      return new HiveEndPoint(metaStoreUri, database, table, null);
+    }
+
+    ArrayList<String> realPartVals = Lists.newArrayList();
+    for (String partVal : partVals) {
+      realPartVals.add(BucketPath.escapeString(partVal, headers, timeZone,
+              needRounding, roundUnit, roundValue, useLocalTime));
+    }
+    return new HiveEndPoint(metaStoreUri, database, table, realPartVals);
+  }
+
+  /**
+   * Locate writer that has not been used for longest time and retire it
+   */
+  private void closeEldestWriter() throws InterruptedException {
+    long oldestTimeStamp = System.currentTimeMillis();
+    HiveEndPoint eldest = null;
+    for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+      if (entry.getValue().getLastUsed() < oldestTimeStamp) {
+        eldest = entry.getKey();
+        oldestTimeStamp = entry.getValue().getLastUsed();
+      }
+    }
+
+    try {
+      sinkCounter.incrementConnectionCreatedCount();
+      LOG.info(getName() + ": Closing least used Writer to Hive EndPoint : " + eldest);
+      allWriters.remove(eldest).close();
+    } catch (InterruptedException e) {
+      LOG.warn(getName() + ": Interrupted when attempting to close writer for end point: "
+              + eldest, e);
+      throw e;
+    }
+  }
+
+  /**
+   * Locate all writers past idle timeout and retire them
+   * @return number of writers retired
+   */
+  private int closeIdleWriters() throws InterruptedException {
+    int count = 0;
+    long now = System.currentTimeMillis();
+    ArrayList<HiveEndPoint> retirees = Lists.newArrayList();
+
+    //1) Find retirement candidates
+    for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+      if (now - entry.getValue().getLastUsed() > idleTimeout) {
+        ++count;
+        retirees.add(entry.getKey());
+      }
+    }
+    //2) Retire them
+    for(HiveEndPoint ep : retirees) {
+      sinkCounter.incrementConnectionClosedCount();
+      LOG.info(getName() + ": Closing idle Writer to Hive end point : {}", ep);
+      allWriters.remove(ep).close();
+    }
+    return count;
+  }
+
+  /**
+   * Closes all writers and remove them from cache
+   * @return number of writers retired
+   */
+  private void closeAllWriters() throws InterruptedException {
+    //1) Retire writers
+    for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+        entry.getValue().close();
+    }
+
+    //2) Clear cache
+    allWriters.clear();
+  }
+
+  /**
+   * Abort current Txn on all writers
+   * @return number of writers retired
+   */
+  private void abortAllWriters() throws InterruptedException {
+    for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
+        entry.getValue().abort();
+    }
+  }
+
+  @Override
+  public void stop() {
+    // do not constrain close() calls with a timeout
+    for (Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) {
+      try {
+        HiveWriter w = entry.getValue();
+        LOG.info("Closing connection to {}", w);
+        w.closeConnection();
+      } catch (InterruptedException ex) {
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    // shut down all thread pools
+    callTimeoutPool.shutdown();
+    try {
+      while (callTimeoutPool.isTerminated() == false) {
+        callTimeoutPool.awaitTermination(
+              Math.max(DEFAULT_CALLTIMEOUT, callTimeout), TimeUnit.MILLISECONDS);
+      }
+    } catch (InterruptedException ex) {
+      LOG.warn(getName() + ":Shutdown interrupted on " + callTimeoutPool, ex);
+    }
+
+    callTimeoutPool = null;
+    allWriters.clear();
+    allWriters = null;
+    sinkCounter.stop();
+    super.stop();
+    LOG.info("Hive Sink {} stopped", getName() );
+  }
+
+  @Override
+  public void start() {
+    String timeoutName = "hive-" + getName() + "-call-runner-%d";
+    // call timeout pool needs only 1 thd as sink is effectively single threaded
+    callTimeoutPool = Executors.newFixedThreadPool(1,
+            new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
+
+    this.allWriters = Maps.newHashMap();
+    sinkCounter.start();
+    super.start();
+    setupHeartBeatTimer();
+    LOG.info(getName() + ": Hive Sink {} started", getName() );
+  }
+
+  private void setupHeartBeatTimer() {
+    if (heartBeatInterval > 0) {
+      heartBeatTimer.schedule(new TimerTask() {
+        @Override
+        public void run() {
+          timeToSendHeartBeat.set(true);
+          setupHeartBeatTimer();
+        }
+      }, heartBeatInterval * 1000);
+    }
+  }
+
+
+  @Override
+  public String toString() {
+    return "{ Sink type:" + getClass().getSimpleName() + ", name:" + getName() +
+            " }";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java
new file mode 100644
index 0000000..4a06feb
--- /dev/null
+++ b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java
@@ -0,0 +1,426 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hive;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hive.hcatalog.streaming.*;
+
+import org.apache.flume.Event;
+
+import org.apache.flume.instrumentation.SinkCounter;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Internal API intended for HiveSink use.
+ */
+class HiveWriter {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(HiveWriter.class);
+
+  private final HiveEndPoint endPoint;
+  private HiveEventSerializer serializer;
+  private final StreamingConnection connection;
+  private final int txnsPerBatch;
+  private final RecordWriter recordWriter;
+  private TransactionBatch txnBatch;
+
+  private final ExecutorService callTimeoutPool;
+
+  private final long callTimeout;
+
+  private long lastUsed; // time of last flush on this writer
+
+  private SinkCounter sinkCounter;
+  private int batchCounter;
+  private long eventCounter;
+  private long processSize;
+
+  protected boolean closed; // flag indicating HiveWriter was closed
+  private boolean autoCreatePartitions;
+
+  private boolean hearbeatNeeded = false;
+
+  HiveWriter(HiveEndPoint endPoint, int txnsPerBatch,
+             boolean autoCreatePartitions, long callTimeout,
+             ExecutorService callTimeoutPool, String hiveUser,
+             HiveEventSerializer serializer, SinkCounter sinkCounter)
+          throws ConnectException, InterruptedException {
+    try {
+      this.autoCreatePartitions = autoCreatePartitions;
+      this.sinkCounter = sinkCounter;
+      this.callTimeout = callTimeout;
+      this.callTimeoutPool = callTimeoutPool;
+      this.endPoint = endPoint;
+      this.connection = newConnection(hiveUser);
+      this.txnsPerBatch = txnsPerBatch;
+      this.serializer = serializer;
+      this.recordWriter = serializer.createRecordWriter(endPoint);
+      this.txnBatch = nextTxnBatch(recordWriter);
+      this.closed = false;
+      this.lastUsed = System.currentTimeMillis();
+    } catch (InterruptedException e) {
+      throw e;
+    } catch (RuntimeException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new ConnectException(endPoint, e);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return endPoint.toString();
+  }
+
+  /**
+   * Clear the class counters
+   */
+  private void resetCounters() {
+    eventCounter = 0;
+    processSize = 0;
+    batchCounter = 0;
+  }
+
+  void setHearbeatNeeded() {
+    hearbeatNeeded = true;
+  }
+
+
+  /**
+   * Write data, update stats
+   * @param event
+   * @throws StreamingException
+   * @throws InterruptedException
+   */
+  public synchronized void write(final Event event)
+          throws WriteException, InterruptedException {
+    if (closed) {
+      throw new IllegalStateException("Writer closed. Cannot write to : " + endPoint);
+    }
+
+    // write the event
+    try {
+      timedCall(new CallRunner1<Void>() {
+        @Override
+        public Void call() throws InterruptedException, StreamingException {
+          try {
+            serializer.write(txnBatch, event);
+            return null;
+          } catch (IOException e) {
+            throw new StreamingIOFailure(e.getMessage(), e);
+          }
+        }
+      });
+    } catch (StreamingException e) {
+      throw new WriteException(endPoint, txnBatch.getCurrentTxnId(), e);
+    } catch (TimeoutException e) {
+      throw new WriteException(endPoint, txnBatch.getCurrentTxnId(), e);
+    }
+
+    // Update Statistics
+    processSize += event.getBody().length;
+    eventCounter++;
+  }
+
+  /**
+   * Commits the current Txn.
+   * If 'rollToNext' is true, will switch to next Txn in batch or to a
+   *       new TxnBatch if current Txn batch is exhausted
+   */
+  public void flush(boolean rollToNext)
+          throws CommitException, TxnBatchException, TxnFailure, InterruptedException {
+    //0 Heart beat on TxnBatch
+    if(hearbeatNeeded) {
+      hearbeatNeeded = false;
+      heartBeat();
+    }
+    lastUsed = System.currentTimeMillis();
+
+    try {
+      //1 commit txn & close batch if needed
+      commitTxn();
+      if(txnBatch.remainingTransactions() == 0) {
+        closeTxnBatch();
+        txnBatch = null;
+        if(rollToNext) {
+          txnBatch = nextTxnBatch(recordWriter);
+        }
+      }
+
+      //2 roll to next Txn
+      if(rollToNext) {
+        LOG.debug("Switching to next Txn for {}", endPoint);
+        txnBatch.beginNextTransaction(); // does not block
+      }
+    } catch (StreamingException e) {
+      throw new TxnFailure(txnBatch, e);
+    }
+  }
+
+  /**
+   * Aborts the current Txn and switches to next Txn.
+   * @throws StreamingException if could not get new Transaction Batch, or switch to next Txn
+   */
+  public void abort()  throws InterruptedException {
+    abortTxn();
+  }
+
+  /** Queues up a heartbeat request on the current and remaining txns using the
+   *  heartbeatThdPool and returns immediately
+   */
+  public void heartBeat() throws InterruptedException  {
+    // 1) schedule the heartbeat on one thread in pool
+    try {
+      timedCall(new CallRunner1<Void>() {
+        @Override
+        public Void call() throws StreamingException {
+          LOG.info("Sending heartbeat on batch " + txnBatch);
+          txnBatch.heartbeat();
+          return null;
+        }
+      });
+    } catch (InterruptedException e) {
+      throw e;
+    } catch (Exception e) {
+      LOG.warn("Unable to send heartbeat on Txn Batch " + txnBatch, e);
+      // Suppressing exceptions as we don't care for errors on heartbeats
+    }
+  }
+
+  /**
+   * Close the Transaction Batch and connection
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public void close() throws InterruptedException {
+    closeTxnBatch();
+    closeConnection();
+    closed = true;
+  }
+
+  public void closeConnection() throws InterruptedException {
+    LOG.info("Closing connection to EndPoint : {}", endPoint);
+    try {
+      timedCall(new CallRunner1<Void>() {
+        @Override
+        public Void call() {
+          connection.close(); // could block
+          return null;
+        }
+      });
+      sinkCounter.incrementConnectionClosedCount();
+    } catch (Exception e) {
+      LOG.warn("Error closing connection to EndPoint : " + endPoint, e);
+      // Suppressing exceptions as we don't care for errors on connection close
+    }
+  }
+
+  private void commitTxn() throws CommitException, InterruptedException {
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Committing Txn " + txnBatch.getCurrentTxnId() + " on EndPoint: " + endPoint);
+    }
+    try {
+      timedCall(new CallRunner1<Void>() {
+        @Override
+        public Void call() throws StreamingException, InterruptedException {
+          txnBatch.commit(); // could block
+          return null;
+        }
+      });
+    } catch (Exception e) {
+      throw new CommitException(endPoint, txnBatch.getCurrentTxnId(), e);
+    }
+  }
+
+  private void abortTxn() throws InterruptedException {
+    LOG.info("Aborting Txn id {} on End Point {}", txnBatch.getCurrentTxnId(), endPoint);
+    try {
+      timedCall(new CallRunner1<Void>() {
+        @Override
+        public Void call() throws StreamingException, InterruptedException {
+          txnBatch.abort(); // could block
+          return null;
+        }
+      });
+    } catch (InterruptedException e) {
+      throw e;
+    } catch (TimeoutException e) {
+      LOG.warn("Timeout while aborting Txn " + txnBatch.getCurrentTxnId() + " on EndPoint: " + endPoint, e);
+    } catch (Exception e) {
+      LOG.warn("Error aborting Txn " + txnBatch.getCurrentTxnId() + " on EndPoint: " + endPoint, e);
+      // Suppressing exceptions as we don't care for errors on abort
+    }
+  }
+
+  private StreamingConnection newConnection(final String proxyUser)
+          throws InterruptedException, ConnectException {
+    try {
+      return  timedCall(new CallRunner1<StreamingConnection>() {
+        @Override
+        public StreamingConnection call() throws InterruptedException, StreamingException {
+          return endPoint.newConnection(autoCreatePartitions); // could block
+        }
+      });
+    } catch (Exception e) {
+      throw new ConnectException(endPoint, e);
+    }
+  }
+
+  private TransactionBatch nextTxnBatch(final RecordWriter recordWriter)
+          throws InterruptedException, TxnBatchException {
+    LOG.debug("Fetching new Txn Batch for {}", endPoint);
+    TransactionBatch batch = null;
+    try {
+      batch = timedCall(new CallRunner1<TransactionBatch>() {
+        @Override
+        public TransactionBatch call() throws InterruptedException, StreamingException {
+          return connection.fetchTransactionBatch(txnsPerBatch, recordWriter); // could block
+        }
+      });
+      LOG.info("Acquired Txn Batch {}. Switching to first txn", batch);
+      batch.beginNextTransaction();
+    } catch (Exception e) {
+      throw new TxnBatchException(endPoint, e);
+    }
+    return batch;
+  }
+
+  private void closeTxnBatch() throws InterruptedException {
+    try {
+      LOG.debug("Closing Txn Batch {}", txnBatch);
+      timedCall(new CallRunner1<Void>() {
+        @Override
+        public Void call() throws InterruptedException, StreamingException {
+          txnBatch.close(); // could block
+          return null;
+        }
+      });
+    } catch (InterruptedException e) {
+      throw e;
+    } catch (Exception e) {
+      LOG.warn("Error closing Txn Batch " + txnBatch, e);
+      // Suppressing exceptions as we don't care for errors on batch close
+    }
+  }
+
+  private <T> T timedCall(final CallRunner1<T> callRunner)
+          throws TimeoutException, InterruptedException, StreamingException {
+    Future<T> future = callTimeoutPool.submit(new Callable<T>() {
+      @Override
+      public T call() throws StreamingException, InterruptedException {
+        return callRunner.call();
+      }
+    });
+
+    try {
+      if (callTimeout > 0) {
+        return future.get(callTimeout, TimeUnit.MILLISECONDS);
+      } else {
+        return future.get();
+      }
+    } catch (TimeoutException eT) {
+      future.cancel(true);
+      sinkCounter.incrementConnectionFailedCount();
+      throw eT;
+    } catch (ExecutionException e1) {
+      sinkCounter.incrementConnectionFailedCount();
+      Throwable cause = e1.getCause();
+      if (cause instanceof IOException ) {
+        throw new StreamingIOFailure("I/O Failure", (IOException) cause);
+      } else if (cause instanceof StreamingException) {
+        throw (StreamingException) cause;
+      } else if (cause instanceof TimeoutException) {
+        throw new StreamingException("Operation Timed Out.", (TimeoutException) cause);
+      } else if (cause instanceof RuntimeException) {
+        throw (RuntimeException) cause;
+      } else if (cause instanceof InterruptedException) {
+        throw (InterruptedException) cause;
+      }
+      throw new StreamingException(e1.getMessage(), e1);
+    }
+  }
+
+  long getLastUsed() {
+    return lastUsed;
+  }
+
+  /**
+   * Simple interface whose <tt>call</tt> method is called by
+   * {#callWithTimeout} in a new thread inside a
+   * {@linkplain java.security.PrivilegedExceptionAction#run()} call.
+   * @param <T>
+   */
+  private interface CallRunner<T> {
+    T call() throws Exception;
+  }
+
+
+  private interface CallRunner1<T> {
+    T call() throws StreamingException, InterruptedException;
+  }
+
+
+  public static class Failure extends Exception {
+    public Failure(String msg, Throwable cause) {
+      super(msg, cause);
+    }
+  }
+
+  public static class WriteException extends Failure {
+    public WriteException(HiveEndPoint endPoint, Long currentTxnId, Throwable cause) {
+      super("Failed writing to : " + endPoint + ". TxnID : " + currentTxnId, cause);
+    }
+  }
+
+  public static class CommitException extends Failure {
+    public CommitException(HiveEndPoint endPoint, Long txnID, Throwable cause) {
+      super("Commit of Txn " + txnID +  " failed on EndPoint: " + endPoint, cause);
+    }
+  }
+
+  public static class ConnectException extends Failure {
+    public ConnectException(HiveEndPoint ep, Throwable cause) {
+      super("Failed connecting to EndPoint " + ep, cause);
+    }
+  }
+
+  public static class TxnBatchException extends Failure {
+    public TxnBatchException(HiveEndPoint ep, Throwable cause) {
+      super("Failed acquiring Transaction Batch from EndPoint: " + ep, cause);
+    }
+  }
+
+  private class TxnFailure extends Failure {
+    public TxnFailure(TransactionBatch txnBatch, Throwable cause) {
+      super("Failed switching to next Txn in TxnBatch " + txnBatch, cause);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java
new file mode 100644
index 0000000..46724f2
--- /dev/null
+++ b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.flume.sink.hive;
+
+import com.google.common.collect.Lists;
+import junit.framework.Assert;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.SimpleEvent;
+import org.apache.flume.instrumentation.SinkCounter;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.UUID;
+
+public class TestHiveSink {
+  // 1)  partitioned table
+  final static String dbName = "testing";
+  final static String tblName = "alerts";
+
+  public static final String PART1_NAME = "continent";
+  public static final String PART2_NAME = "country";
+  public static final String[] partNames = { PART1_NAME, PART2_NAME };
+
+  private static final String COL1 = "id";
+  private static final String COL2 = "msg";
+  final String[] colNames = {COL1,COL2};
+  private String[] colTypes = { "int", "string" };
+
+  private static final String PART1_VALUE = "Asia";
+  private static final String PART2_VALUE = "India";
+  private final ArrayList<String> partitionVals;
+
+  // 2) un-partitioned table
+  final static String dbName2 = "testing2";
+  final static String tblName2 = "alerts2";
+  final String[] colNames2 = {COL1,COL2};
+  private String[] colTypes2 = { "int", "string" };
+
+  HiveSink sink = new HiveSink();
+
+  private final HiveConf conf;
+
+  private final Driver driver;
+
+  final String metaStoreURI;
+
+  @Rule
+  public TemporaryFolder dbFolder = new TemporaryFolder();
+
+
+  private static final Logger LOG = LoggerFactory.getLogger(HiveSink.class);
+
+  public TestHiveSink() throws Exception {
+    partitionVals = new ArrayList<String>(2);
+    partitionVals.add(PART1_VALUE);
+    partitionVals.add(PART2_VALUE);
+
+    metaStoreURI = "null";
+
+    conf = new HiveConf(this.getClass());
+    TestUtil.setConfValues(conf);
+
+    // 1) prepare hive
+    TxnDbUtil.cleanDb();
+    TxnDbUtil.prepDb();
+
+    // 2) Setup Hive client
+    SessionState.start(new CliSessionState(conf));
+    driver = new Driver(conf);
+
+  }
+
+
+  @Before
+  public void setUp() throws Exception {
+    TestUtil.dropDB(conf, dbName);
+
+    sink = new HiveSink();
+    sink.setName("HiveSink-" + UUID.randomUUID().toString());
+
+    String dbLocation = dbFolder.newFolder(dbName).getCanonicalPath() + ".db";
+    dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths
+    TestUtil.createDbAndTable(driver, dbName, tblName, partitionVals, colNames,
+            colTypes, partNames, dbLocation);
+  }
+
+  @After
+  public void tearDown() throws MetaException, HiveException {
+    TestUtil.dropDB(conf, dbName);
+  }
+
+
+  @Test
+  public void testSingleWriterSimplePartitionedTable()
+          throws EventDeliveryException, IOException, CommandNeedRetryException {
+    int totalRecords = 4;
+    int batchSize = 2;
+    int batchCount = totalRecords / batchSize;
+
+    Context context = new Context();
+    context.put("hive.metastore", metaStoreURI);
+    context.put("hive.database",dbName);
+    context.put("hive.table",tblName);
+    context.put("hive.partition", PART1_VALUE + "," + PART2_VALUE);
+    context.put("autoCreatePartitions","false");
+    context.put("batchSize","" + batchSize);
+    context.put("serializer", HiveDelimitedTextSerializer.ALIAS);
+    context.put("serializer.fieldnames", COL1 + ",," + COL2 + ",");
+    context.put("heartBeatInterval", "0");
+
+    Channel channel = startSink(sink, context);
+
+    List<String> bodies = Lists.newArrayList();
+
+    // push the events in two batches
+    Transaction txn = channel.getTransaction();
+    txn.begin();
+    for (int j = 1; j <= totalRecords; j++) {
+      Event event = new SimpleEvent();
+      String body = j + ",blah,This is a log message,other stuff";
+      event.setBody(body.getBytes());
+      bodies.add(body);
+      channel.put(event);
+    }
+    // execute sink to process the events
+    txn.commit();
+    txn.close();
+
+
+    checkRecordCountInTable(0, dbName, tblName);
+    for (int i = 0; i < batchCount ; i++) {
+      sink.process();
+    }
+    sink.stop();
+    checkRecordCountInTable(totalRecords, dbName, tblName);
+  }
+
+  @Test
+  public void testSingleWriterSimpleUnPartitionedTable()
+          throws Exception {
+    TestUtil.dropDB(conf, dbName2);
+    String dbLocation = dbFolder.newFolder(dbName2).getCanonicalPath() + ".db";
+    dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths
+    TestUtil.createDbAndTable(driver, dbName2, tblName2, null, colNames2, colTypes2
+            , null, dbLocation);
+
+    try {
+      int totalRecords = 4;
+      int batchSize = 2;
+      int batchCount = totalRecords / batchSize;
+
+      Context context = new Context();
+      context.put("hive.metastore", metaStoreURI);
+      context.put("hive.database", dbName2);
+      context.put("hive.table", tblName2);
+      context.put("autoCreatePartitions","false");
+      context.put("batchSize","" + batchSize);
+      context.put("serializer", HiveDelimitedTextSerializer.ALIAS);
+      context.put("serializer.fieldnames", COL1 + ",," + COL2 + ",");
+      context.put("heartBeatInterval", "0");
+
+      Channel channel = startSink(sink, context);
+
+      List<String> bodies = Lists.newArrayList();
+
+      // Push the events in two batches
+      Transaction txn = channel.getTransaction();
+      txn.begin();
+      for (int j = 1; j <= totalRecords; j++) {
+        Event event = new SimpleEvent();
+        String body = j + ",blah,This is a log message,other stuff";
+        event.setBody(body.getBytes());
+        bodies.add(body);
+        channel.put(event);
+      }
+
+      txn.commit();
+      txn.close();
+
+      checkRecordCountInTable(0, dbName2, tblName2);
+      for (int i = 0; i < batchCount ; i++) {
+        sink.process();
+      }
+
+      // check before & after  stopping sink
+      checkRecordCountInTable(totalRecords, dbName2, tblName2);
+      sink.stop();
+      checkRecordCountInTable(totalRecords, dbName2, tblName2);
+    } finally {
+      TestUtil.dropDB(conf, dbName2);
+    }
+  }
+
+  @Test
+  public void testSingleWriterUseHeaders()
+          throws Exception {
+    String[] colNames = {COL1, COL2};
+    String PART1_NAME = "country";
+    String PART2_NAME = "hour";
+    String[] partNames = {PART1_NAME, PART2_NAME};
+    List<String> partitionVals = null;
+    String PART1_VALUE = "%{" + PART1_NAME + "}";
+    String PART2_VALUE = "%y-%m-%d-%k";
+    partitionVals = new ArrayList<String>(2);
+    partitionVals.add(PART1_VALUE);
+    partitionVals.add(PART2_VALUE);
+
+    String tblName = "hourlydata";
+    TestUtil.dropDB(conf, dbName2);
+    String dbLocation = dbFolder.newFolder(dbName2).getCanonicalPath() + ".db";
+    dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths
+    TestUtil.createDbAndTable(driver, dbName2, tblName, partitionVals, colNames,
+            colTypes, partNames, dbLocation);
+
+    int totalRecords = 4;
+    int batchSize = 2;
+    int batchCount = totalRecords / batchSize;
+
+    Context context = new Context();
+    context.put("hive.metastore",metaStoreURI);
+    context.put("hive.database",dbName2);
+    context.put("hive.table",tblName);
+    context.put("hive.partition", PART1_VALUE + "," + PART2_VALUE);
+    context.put("autoCreatePartitions","true");
+    context.put("useLocalTimeStamp", "false");
+    context.put("batchSize","" + batchSize);
+    context.put("serializer", HiveDelimitedTextSerializer.ALIAS);
+    context.put("serializer.fieldnames", COL1 + ",," + COL2 + ",");
+    context.put("heartBeatInterval", "0");
+
+    Channel channel = startSink(sink, context);
+
+    Calendar eventDate = Calendar.getInstance();
+    List<String> bodies = Lists.newArrayList();
+
+    // push events in two batches - two per batch. each batch is diff hour
+    Transaction txn = channel.getTransaction();
+    txn.begin();
+    for (int j = 1; j <= totalRecords; j++) {
+      Event event = new SimpleEvent();
+      String body = j + ",blah,This is a log message,other stuff";
+      event.setBody(body.getBytes());
+      eventDate.clear();
+      eventDate.set(2014, 03, 03, j%batchCount, 1); // yy mm dd hh mm
+      event.getHeaders().put( "timestamp",
+              String.valueOf(eventDate.getTimeInMillis()) );
+      event.getHeaders().put( PART1_NAME, "Asia" );
+      bodies.add(body);
+      channel.put(event);
+    }
+    // execute sink to process the events
+    txn.commit();
+    txn.close();
+
+    checkRecordCountInTable(0, dbName2, tblName);
+    for (int i = 0; i < batchCount ; i++) {
+      sink.process();
+    }
+    checkRecordCountInTable(totalRecords, dbName2, tblName);
+    sink.stop();
+
+    // verify counters
+    SinkCounter counter = sink.getCounter();
+    Assert.assertEquals(2, counter.getConnectionCreatedCount());
+    Assert.assertEquals(2, counter.getConnectionClosedCount());
+    Assert.assertEquals(2, counter.getBatchCompleteCount());
+    Assert.assertEquals(0, counter.getBatchEmptyCount());
+    Assert.assertEquals(0, counter.getConnectionFailedCount() );
+    Assert.assertEquals(4, counter.getEventDrainAttemptCount());
+    Assert.assertEquals(4, counter.getEventDrainSuccessCount() );
+
+  }
+
+  @Test
+  public void testHeartBeat()
+          throws EventDeliveryException, IOException, CommandNeedRetryException {
+    int batchSize = 2;
+    int batchCount = 3;
+    int totalRecords = batchCount*batchSize;
+    Context context = new Context();
+    context.put("hive.metastore", metaStoreURI);
+    context.put("hive.database", dbName);
+    context.put("hive.table", tblName);
+    context.put("hive.partition", PART1_VALUE + "," + PART2_VALUE);
+    context.put("autoCreatePartitions","true");
+    context.put("batchSize","" + batchSize);
+    context.put("serializer", HiveDelimitedTextSerializer.ALIAS);
+    context.put("serializer.fieldnames", COL1 + ",," + COL2 + ",");
+    context.put("hive.txnsPerBatchAsk", "20");
+    context.put("heartBeatInterval", "3"); // heartbeat in seconds
+
+    Channel channel = startSink(sink, context);
+
+    List<String> bodies = Lists.newArrayList();
+
+    // push the events in two batches
+    for (int i = 0; i < batchCount; i++) {
+      Transaction txn = channel.getTransaction();
+      txn.begin();
+      for (int j = 1; j <= batchSize; j++) {
+        Event event = new SimpleEvent();
+        String body = i*j + ",blah,This is a log message,other stuff";
+        event.setBody(body.getBytes());
+        bodies.add(body);
+        channel.put(event);
+      }
+      // execute sink to process the events
+      txn.commit();
+      txn.close();
+
+      sink.process();
+      sleep(3000); // allow heartbeat to happen
+    }
+
+    sink.stop();
+    checkRecordCountInTable(totalRecords, dbName, tblName);
+  }
+
+  @Test
+  public void testJsonSerializer() throws Exception {
+    int batchSize = 2;
+    int batchCount = 2;
+    int totalRecords = batchCount*batchSize;
+    Context context = new Context();
+    context.put("hive.metastore",metaStoreURI);
+    context.put("hive.database",dbName);
+    context.put("hive.table",tblName);
+    context.put("hive.partition", PART1_VALUE + "," + PART2_VALUE);
+    context.put("autoCreatePartitions","true");
+    context.put("batchSize","" + batchSize);
+    context.put("serializer", HiveJsonSerializer.ALIAS);
+    context.put("serializer.fieldnames", COL1 + ",," + COL2 + ",");
+    context.put("heartBeatInterval", "0");
+
+    Channel channel = startSink(sink, context);
+
+    List<String> bodies = Lists.newArrayList();
+
+    // push the events in two batches
+    for (int i = 0; i < batchCount; i++) {
+      Transaction txn = channel.getTransaction();
+      txn.begin();
+      for (int j = 1; j <= batchSize; j++) {
+        Event event = new SimpleEvent();
+        String body = "{\"id\" : 1, \"msg\" : \"using json serializer\"}";
+        event.setBody(body.getBytes());
+        bodies.add(body);
+        channel.put(event);
+      }
+      // execute sink to process the events
+      txn.commit();
+      txn.close();
+
+      sink.process();
+    }
+    checkRecordCountInTable(totalRecords, dbName, tblName);
+    sink.stop();
+    checkRecordCountInTable(totalRecords, dbName, tblName);
+  }
+
+  private void sleep(int n) {
+    try {
+      Thread.sleep(n);
+    } catch (InterruptedException e) {
+    }
+  }
+
+  private static Channel startSink(HiveSink sink, Context context) {
+    Configurables.configure(sink, context);
+
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, context);
+    sink.setChannel(channel);
+    sink.start();
+    return channel;
+  }
+
+  private void checkRecordCountInTable(int expectedCount, String db, String tbl)
+          throws CommandNeedRetryException, IOException {
+    int count = TestUtil.listRecordsInTable(driver, db, tbl).size();
+    Assert.assertEquals(expectedCount, count);
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/a7f9255a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java
new file mode 100644
index 0000000..174f179
--- /dev/null
+++ b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.sink.hive;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import junit.framework.Assert;
+import org.apache.flume.Context;
+import org.apache.flume.event.SimpleEvent;
+import org.apache.flume.instrumentation.SinkCounter;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class TestHiveWriter {
+  final static String dbName = "testing";
+  final static String tblName = "alerts";
+
+  public static final String PART1_NAME = "continent";
+  public static final String PART2_NAME = "country";
+  public static final String[] partNames = { PART1_NAME, PART2_NAME };
+
+  private static final String COL1 = "id";
+  private static final String COL2 = "msg";
+  final String[] colNames = {COL1,COL2};
+  private String[] colTypes = { "int", "string" };
+
+  private static final String PART1_VALUE = "Asia";
+  private static final String PART2_VALUE = "India";
+  private final ArrayList<String> partVals;
+
+  private final String metaStoreURI;
+
+  private HiveDelimitedTextSerializer serializer;
+
+  private final HiveConf conf;
+
+  private ExecutorService callTimeoutPool;
+  int timeout = 10000; // msec
+
+  @Rule
+  public TemporaryFolder dbFolder = new TemporaryFolder();
+
+  private final Driver driver;
+
+  public TestHiveWriter() throws Exception {
+    partVals = new ArrayList<String>(2);
+    partVals.add(PART1_VALUE);
+    partVals.add(PART2_VALUE);
+
+    metaStoreURI = null;
+
+    int callTimeoutPoolSize = 1;
+    callTimeoutPool = Executors.newFixedThreadPool(callTimeoutPoolSize,
+            new ThreadFactoryBuilder().setNameFormat("hiveWriterTest").build());
+
+    // 1) Start metastore
+    conf = new HiveConf(this.getClass());
+    TestUtil.setConfValues(conf);
+    if (metaStoreURI != null) {
+      conf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreURI);
+    }
+
+    // 2) Setup Hive client
+    SessionState.start(new CliSessionState(conf));
+    driver = new Driver(conf);
+
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    // 1) prepare hive
+    TxnDbUtil.cleanDb();
+    TxnDbUtil.prepDb();
+
+    // 1) Setup tables
+    TestUtil.dropDB(conf, dbName);
+    String dbLocation = dbFolder.newFolder(dbName).getCanonicalPath() + ".db";
+    dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths
+    TestUtil.createDbAndTable(driver, dbName, tblName, partVals, colNames, colTypes
+            , partNames, dbLocation);
+
+    // 2) Setup serializer
+    Context ctx = new Context();
+    ctx.put("serializer.fieldnames", COL1 + ",," + COL2 + ",");
+    serializer = new HiveDelimitedTextSerializer();
+    serializer.configure(ctx);
+  }
+
+  @Test
+  public void testInstantiate() throws Exception {
+    HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals);
+    SinkCounter sinkCounter = new SinkCounter(this.getClass().getName());
+    HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout
+            , callTimeoutPool, "flumetest", serializer, sinkCounter);
+
+    writer.close();
+  }
+
+  @Test
+  public void testWriteBasic() throws Exception {
+    HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals);
+    SinkCounter sinkCounter = new SinkCounter(this.getClass().getName());
+    HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout
+            , callTimeoutPool, "flumetest", serializer, sinkCounter);
+
+    writeEvents(writer,3);
+    writer.flush(false);
+    writer.close();
+    checkRecordCountInTable(3);
+  }
+
+  @Test
+  public void testWriteMultiFlush() throws Exception {
+    HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals);
+    SinkCounter sinkCounter = new SinkCounter(this.getClass().getName());
+
+    HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout
+            , callTimeoutPool, "flumetest", serializer, sinkCounter);
+
+    checkRecordCountInTable(0);
+    SimpleEvent event = new SimpleEvent();
+
+    String REC1 = "1,xyz,Hello world,abc";
+    event.setBody(REC1.getBytes());
+    writer.write(event);
+    checkRecordCountInTable(0);
+    writer.flush(true);
+    checkRecordCountInTable(1);
+
+    String REC2 = "2,xyz,Hello world,abc";
+    event.setBody(REC2.getBytes());
+    writer.write(event);
+    checkRecordCountInTable(1);
+    writer.flush(true);
+    checkRecordCountInTable(2);
+
+    String REC3 = "3,xyz,Hello world,abc";
+    event.setBody(REC3.getBytes());
+    writer.write(event);
+    writer.flush(true);
+    checkRecordCountInTable(3);
+    writer.close();
+
+    checkRecordCountInTable(3);
+  }
+
+  private void checkRecordCountInTable(int expectedCount)
+          throws CommandNeedRetryException, IOException {
+    int count = TestUtil.listRecordsInTable(driver, dbName, tblName).size();
+    Assert.assertEquals(expectedCount, count);
+  }
+
+  /**
+   * Sets up input fields to have same order as table columns,
+   * Also sets the separator on serde to be same as i/p field separator
+   * @throws Exception
+   */
+  @Test
+  public void testInOrderWrite() throws Exception {
+    HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals);
+    SinkCounter sinkCounter = new SinkCounter(this.getClass().getName());
+    int timeout = 5000; // msec
+
+    HiveDelimitedTextSerializer serializer2 = new HiveDelimitedTextSerializer();
+    Context ctx = new Context();
+    ctx.put("serializer.fieldnames", COL1 + "," + COL2);
+    ctx.put("serializer.serdeSeparator", ",");
+    serializer2.configure(ctx);
+
+
+    HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout, callTimeoutPool,
+            "flumetest", serializer2, sinkCounter);
+
+    SimpleEvent event = new SimpleEvent();
+    event.setBody("1,Hello world 1".getBytes());
+    writer.write(event);
+    event.setBody("2,Hello world 2".getBytes());
+    writer.write(event);
+    event.setBody("3,Hello world 3".getBytes());
+    writer.write(event);
+    writer.flush(false);
+    writer.close();
+  }
+
+  @Test
+  public void testSerdeSeparatorCharParsing() throws Exception {
+    HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals);
+    SinkCounter sinkCounter = new SinkCounter(this.getClass().getName());
+    int timeout = 10000; // msec
+
+    // 1)  single character serdeSeparator
+    HiveDelimitedTextSerializer serializer1 = new HiveDelimitedTextSerializer();
+    Context ctx = new Context();
+    ctx.put("serializer.fieldnames", COL1 + "," + COL2);
+    ctx.put("serializer.serdeSeparator", ",");
+    serializer1.configure(ctx);
+    // show not throw
+
+
+    // 2) special character as serdeSeparator
+    HiveDelimitedTextSerializer serializer2 = new HiveDelimitedTextSerializer();
+    ctx = new Context();
+    ctx.put("serializer.fieldnames", COL1 + "," + COL2);
+    ctx.put("serializer.serdeSeparator", "'\t'");
+    serializer2.configure(ctx);
+    // show not throw
+
+
+    // 2) bad spec as serdeSeparator
+    HiveDelimitedTextSerializer serializer3 = new HiveDelimitedTextSerializer();
+    ctx = new Context();
+    ctx.put("serializer.fieldnames", COL1 + "," + COL2);
+    ctx.put("serializer.serdeSeparator", "ab");
+    try {
+      serializer3.configure(ctx);
+      Assert.assertTrue("Bad serdeSeparator character was accepted" ,false);
+    } catch (Exception e){
+      // expect an exception
+    }
+
+  }
+
+
+  @Test
+  public void testSecondWriterBeforeFirstCommits() throws Exception {
+    // here we open a new writer while the first is still writing (not committed)
+    HiveEndPoint endPoint1 = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals);
+    ArrayList<String> partVals2 = new ArrayList<String>(2);
+    partVals2.add(PART1_VALUE);
+    partVals2.add("Nepal");
+    HiveEndPoint endPoint2 = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals2);
+
+    SinkCounter sinkCounter1 = new SinkCounter(this.getClass().getName());
+    SinkCounter sinkCounter2 = new SinkCounter(this.getClass().getName());
+
+    HiveWriter writer1 = new HiveWriter(endPoint1, 10, true, timeout
+            , callTimeoutPool, "flumetest", serializer, sinkCounter1);
+
+    writeEvents(writer1, 3);
+
+    HiveWriter writer2 = new HiveWriter(endPoint2, 10, true, timeout
+            , callTimeoutPool, "flumetest", serializer, sinkCounter2);
+    writeEvents(writer2, 3);
+    writer2.flush(false); // commit
+
+    writer1.flush(false); // commit
+    writer1.close();
+
+    writer2.close();
+  }
+
+
+  @Test
+  public void testSecondWriterAfterFirstCommits() throws Exception {
+    // here we open a new writer after the first writer has committed one txn
+    HiveEndPoint endPoint1 = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals);
+    ArrayList<String> partVals2 = new ArrayList<String>(2);
+    partVals2.add(PART1_VALUE);
+    partVals2.add("Nepal");
+    HiveEndPoint endPoint2 = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals2);
+
+    SinkCounter sinkCounter1 = new SinkCounter(this.getClass().getName());
+    SinkCounter sinkCounter2 = new SinkCounter(this.getClass().getName());
+
+    HiveWriter writer1 = new HiveWriter(endPoint1, 10, true, timeout
+            , callTimeoutPool, "flumetest", serializer, sinkCounter1);
+
+    writeEvents(writer1, 3);
+
+    writer1.flush(false); // commit
+
+
+    HiveWriter writer2 = new HiveWriter(endPoint2, 10, true, timeout
+            , callTimeoutPool, "flumetest", serializer, sinkCounter2);
+    writeEvents(writer2, 3);
+    writer2.flush(false); // commit
+
+
+    writer1.close();
+    writer2.close();
+  }
+
+
+  private void writeEvents(HiveWriter writer, int count) throws InterruptedException, HiveWriter.WriteException {
+    SimpleEvent event = new SimpleEvent();
+    for (int i = 1; i <= count; i++) {
+      event.setBody((i + ",xyz,Hello world,abc").getBytes());
+      writer.write(event);
+    }
+  }
+}