You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by jm...@apache.org on 2017/02/15 14:17:20 UTC

[2/2] phoenix git commit: PHOENIX-3135 Support loading csv data using Flume plugin (Kalyan Hadoop)

PHOENIX-3135 Support loading csv data using Flume plugin (Kalyan Hadoop)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/564e9b83
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/564e9b83
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/564e9b83

Branch: refs/heads/master
Commit: 564e9b83502e50a30ca0c5eefc6daf6ce8bad065
Parents: 18a86b6
Author: Josh Mahonin <jm...@gmail.com>
Authored: Wed Feb 15 09:12:08 2017 -0500
Committer: Josh Mahonin <jm...@gmail.com>
Committed: Wed Feb 15 09:16:54 2017 -0500

----------------------------------------------------------------------
 phoenix-flume/pom.xml                           |   6 +-
 .../phoenix/flume/CsvEventSerializerIT.java     | 416 +++++++++++++++++++
 .../apache/phoenix/flume/FlumeConstants.java    |  14 +-
 .../flume/serializer/CsvEventSerializer.java    | 196 +++++++++
 .../flume/serializer/EventSerializers.java      |   4 +-
 5 files changed, 631 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/564e9b83/phoenix-flume/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-flume/pom.xml b/phoenix-flume/pom.xml
index 5c4f197..279806f 100644
--- a/phoenix-flume/pom.xml
+++ b/phoenix-flume/pom.xml
@@ -178,7 +178,11 @@
       <artifactId>json-path</artifactId>
       <version>2.2.0</version>
     </dependency>
-
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-csv</artifactId>
+      <version>${commons-csv.version}</version>
+    </dependency>
     <!-- Main dependency on flume. The last to avoid using old commons-io in IT -->
     <dependency>
       <groupId>org.apache.flume</groupId>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/564e9b83/phoenix-flume/src/it/java/org/apache/phoenix/flume/CsvEventSerializerIT.java
----------------------------------------------------------------------
diff --git a/phoenix-flume/src/it/java/org/apache/phoenix/flume/CsvEventSerializerIT.java b/phoenix-flume/src/it/java/org/apache/phoenix/flume/CsvEventSerializerIT.java
new file mode 100644
index 0000000..842db04
--- /dev/null
+++ b/phoenix-flume/src/it/java/org/apache/phoenix/flume/CsvEventSerializerIT.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.flume;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.flume.serializer.EventSerializers;
+import org.apache.phoenix.flume.sink.PhoenixSink;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.junit.Test;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class CsvEventSerializerIT extends BaseHBaseManagedTimeIT {
+
+	private Context sinkContext;
+	private PhoenixSink sink;
+
+	@Test
+	public void testWithDefaultDelimiters() throws EventDeliveryException, SQLException {
+
+		final String fullTableName = "FLUME_CSV_TEST";
+
+		String ddl = "CREATE TABLE IF NOT EXISTS " + fullTableName
+				+ "  (flume_time timestamp not null, col1 varchar , col2 double, col3 varchar[], col4 integer[]"
+				+ "  CONSTRAINT pk PRIMARY KEY (flume_time))\n";
+		String columns = "col1,col2,col3,col4";
+		String rowkeyType = DefaultKeyGenerator.TIMESTAMP.name();
+		initSinkContext(fullTableName, ddl, columns, null, null, null, null, rowkeyType, null);
+
+		sink = new PhoenixSink();
+		Configurables.configure(sink, sinkContext);
+
+		assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
+
+		final Channel channel = this.initChannel();
+		sink.setChannel(channel);
+
+		sink.start();
+
+		final String eventBody = "kalyan,10.5,\"abc,pqr,xyz\",\"1,2,3,4\"";
+		final Event event = EventBuilder.withBody(Bytes.toBytes(eventBody));
+		// put event in channel
+		Transaction transaction = channel.getTransaction();
+		transaction.begin();
+		channel.put(event);
+		transaction.commit();
+		transaction.close();
+
+		sink.process();
+
+		int rowsInDb = countRows(fullTableName);
+		assertEquals(1, rowsInDb);
+
+		sink.stop();
+		assertEquals(LifecycleState.STOP, sink.getLifecycleState());
+
+		dropTable(fullTableName);
+	}
+
+	@Test
+	public void testKeyGenerator() throws EventDeliveryException, SQLException {
+
+		final String fullTableName = "FLUME_CSV_TEST";
+		initSinkContextWithDefaults(fullTableName);
+
+		sink = new PhoenixSink();
+		Configurables.configure(sink, sinkContext);
+
+		assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
+
+		final Channel channel = this.initChannel();
+		sink.setChannel(channel);
+
+		sink.start();
+		final String eventBody = "kalyan,10.5,\"abc,pqr,xyz\",\"1,2,3,4\"";
+		final Event event = EventBuilder.withBody(Bytes.toBytes(eventBody));
+		// put event in channel
+		Transaction transaction = channel.getTransaction();
+		transaction.begin();
+		channel.put(event);
+		transaction.commit();
+		transaction.close();
+
+		sink.process();
+
+		int rowsInDb = countRows(fullTableName);
+		assertEquals(1, rowsInDb);
+
+		sink.stop();
+		assertEquals(LifecycleState.STOP, sink.getLifecycleState());
+
+		dropTable(fullTableName);
+	}
+
+	@Test
+	public void testMismatchKeyGenerator() throws EventDeliveryException, SQLException {
+
+		final String fullTableName = "FLUME_CSV_TEST";
+		initSinkContextWithDefaults(fullTableName);
+		setConfig(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,
+				DefaultKeyGenerator.UUID.name());
+
+		sink = new PhoenixSink();
+		Configurables.configure(sink, sinkContext);
+		assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
+
+		final Channel channel = this.initChannel();
+		sink.setChannel(channel);
+
+		sink.start();
+		final String eventBody = "kalyan,10.5,\"abc,pqr,xyz\",\"1,2,3,4\"";
+		final Event event = EventBuilder.withBody(Bytes.toBytes(eventBody));
+		// put event in channel
+		Transaction transaction = channel.getTransaction();
+		transaction.begin();
+		channel.put(event);
+		transaction.commit();
+		transaction.close();
+
+		try {
+			sink.process();
+			fail();
+		} catch (Exception ex) {
+			assertTrue(ex.getCause().getMessage().contains("java.lang.IllegalArgumentException: Invalid format:"));
+		}
+
+		dropTable(fullTableName);
+	}
+
+	@Test
+	public void testMissingColumnsInEvent() throws EventDeliveryException, SQLException {
+
+		final String fullTableName = "FLUME_CSV_TEST";
+		initSinkContextWithDefaults(fullTableName);
+
+		sink = new PhoenixSink();
+		Configurables.configure(sink, sinkContext);
+		assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
+
+		final Channel channel = this.initChannel();
+		sink.setChannel(channel);
+
+		sink.start();
+		final String eventBody = "kalyan,\"abc,pqr,xyz\",\"1,2,3,4\"";
+		final Event event = EventBuilder.withBody(Bytes.toBytes(eventBody));
+		// put event in channel
+		Transaction transaction = channel.getTransaction();
+		transaction.begin();
+		channel.put(event);
+		transaction.commit();
+		transaction.close();
+
+		sink.process();
+
+		int rowsInDb = countRows(fullTableName);
+		assertEquals(0, rowsInDb);
+
+		sink.stop();
+		assertEquals(LifecycleState.STOP, sink.getLifecycleState());
+
+		dropTable(fullTableName);
+	}
+
+	@Test
+	public void testBatchEvents() throws EventDeliveryException, SQLException {
+
+		final String fullTableName = "FLUME_CSV_TEST";
+		initSinkContextWithDefaults(fullTableName);
+
+		sink = new PhoenixSink();
+		Configurables.configure(sink, sinkContext);
+		assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
+
+		final Channel channel = this.initChannel();
+		sink.setChannel(channel);
+
+		sink.start();
+		int numEvents = 150;
+		String col1 = "val1";
+		String a1 = "\"aaa,bbb,ccc\"";
+		String a2 = "\"1,2,3,4\"";
+		String eventBody = null;
+		List<Event> eventList = Lists.newArrayListWithCapacity(numEvents);
+		for (int i = 0; i < eventList.size(); i++) {
+			eventBody = (col1 + i) + "," + i * 10.5 + "," + a1 + "," + a2;
+			Event event = EventBuilder.withBody(Bytes.toBytes(eventBody));
+			eventList.add(event);
+		}
+
+		// put event in channel
+		Transaction transaction = channel.getTransaction();
+		transaction.begin();
+		for (Event event : eventList) {
+			channel.put(event);
+		}
+		transaction.commit();
+		transaction.close();
+
+		sink.process();
+
+		int rowsInDb = countRows(fullTableName);
+		assertEquals(eventList.size(), rowsInDb);
+
+		sink.stop();
+		assertEquals(LifecycleState.STOP, sink.getLifecycleState());
+
+		dropTable(fullTableName);
+	}
+
+	@Test
+	public void testEventsWithHeaders() throws Exception {
+
+		sinkContext = new Context();
+		final String fullTableName = "FLUME_CSV_TEST";
+		final String ddl = "CREATE TABLE IF NOT EXISTS "
+				+ fullTableName
+				+ "  (rowkey VARCHAR not null, col1 varchar , col2 double, col3 varchar[], col4 integer[], host varchar , source varchar \n"
+				+ "  CONSTRAINT pk PRIMARY KEY (rowkey))\n";
+		String columns = "col1,col2,col3,col4";
+		String rowkeyType = DefaultKeyGenerator.UUID.name();
+		String headers = "host,source";
+		initSinkContext(fullTableName, ddl, columns, null, null, null, null, rowkeyType, headers);
+
+		sink = new PhoenixSink();
+		Configurables.configure(sink, sinkContext);
+		assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
+
+		final Channel channel = this.initChannel();
+		sink.setChannel(channel);
+
+		sink.start();
+
+		int numEvents = 10;
+		String col1 = "val1";
+		String a1 = "\"aaa,bbb,ccc\"";
+		String a2 = "\"1,2,3,4\"";
+		String hostHeader = "host1";
+		String sourceHeader = "source1";
+		String eventBody = null;
+		List<Event> eventList = Lists.newArrayListWithCapacity(numEvents);
+		for (int i = 0; i < numEvents; i++) {
+			eventBody = (col1 + i) + "," + i * 10.5 + "," + a1 + "," + a2;
+			Map<String, String> headerMap = Maps.newHashMapWithExpectedSize(2);
+			headerMap.put("host", hostHeader);
+			headerMap.put("source", sourceHeader);
+			Event event = EventBuilder.withBody(Bytes.toBytes(eventBody), headerMap);
+			eventList.add(event);
+		}
+
+		// put event in channel
+		Transaction transaction = channel.getTransaction();
+		transaction.begin();
+		for (Event event : eventList) {
+			channel.put(event);
+		}
+		transaction.commit();
+		transaction.close();
+
+		sink.process();
+
+		final String query = " SELECT * FROM \n " + fullTableName;
+		Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+		final ResultSet rs;
+		final Connection conn = DriverManager.getConnection(getUrl(), props);
+		try {
+			rs = conn.createStatement().executeQuery(query);
+			assertTrue(rs.next());
+			assertEquals("host1", rs.getString("host"));
+			assertEquals("source1", rs.getString("source"));
+
+			assertTrue(rs.next());
+			assertEquals("host1", rs.getString("host"));
+			assertEquals("source1", rs.getString("source"));
+		} finally {
+			if (conn != null) {
+				conn.close();
+			}
+		}
+		sink.stop();
+		assertEquals(LifecycleState.STOP, sink.getLifecycleState());
+
+		dropTable(fullTableName);
+	}
+
+	private Channel initChannel() {
+		// Channel configuration
+		Context channelContext = new Context();
+		channelContext.put("capacity", "10000");
+		channelContext.put("transactionCapacity", "200");
+
+		Channel channel = new MemoryChannel();
+		channel.setName("memorychannel");
+		Configurables.configure(channel, channelContext);
+		return channel;
+	}
+
+	private void initSinkContext(final String fullTableName, final String ddl, final String columns,
+			final String csvDelimiter, final String csvQuote, final String csvEscape, final String csvArrayDelimiter,
+			final String rowkeyType, final String headers) {
+		Preconditions.checkNotNull(fullTableName);
+		sinkContext = new Context();
+		sinkContext.put(FlumeConstants.CONFIG_TABLE, fullTableName);
+		sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl());
+		sinkContext.put(FlumeConstants.CONFIG_SERIALIZER, EventSerializers.CSV.name());
+		sinkContext.put(FlumeConstants.CONFIG_TABLE_DDL, ddl);
+		sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_COLUMN_NAMES, columns);
+		if (null != csvDelimiter)
+			sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CSV_DELIMITER, csvDelimiter);
+		if (null != csvQuote)
+			sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CSV_QUOTE, csvQuote);
+		if (null != csvEscape)
+			sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CSV_ESCAPE, csvEscape);
+		if (null != csvArrayDelimiter)
+			sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CSV_ARRAY_DELIMITER,
+					csvArrayDelimiter);
+		if (null != rowkeyType)
+			sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,
+					rowkeyType);
+		if (null != headers)
+			sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_HEADER_NAMES, headers);
+	}
+
+	private void initSinkContextWithDefaults(final String fullTableName) {
+		String ddl = "CREATE TABLE IF NOT EXISTS " + fullTableName
+				+ "  (flume_time timestamp not null, col1 varchar , col2 double, col3 varchar[], col4 integer[]"
+				+ "  CONSTRAINT pk PRIMARY KEY (flume_time))\n";
+		String columns = "col1,col2,col3,col4";
+		String rowkeyType = DefaultKeyGenerator.TIMESTAMP.name();
+		initSinkContext(fullTableName, ddl, columns, null, null, null, null, rowkeyType, null);
+	}
+
+	private void setConfig(final String configName, final String configValue) {
+		Preconditions.checkNotNull(sinkContext);
+		Preconditions.checkNotNull(configName);
+		Preconditions.checkNotNull(configValue);
+		sinkContext.put(configName, configValue);
+	}
+
+	private int countRows(final String fullTableName) throws SQLException {
+		Preconditions.checkNotNull(fullTableName);
+		Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+		final Connection conn = DriverManager.getConnection(getUrl(), props);
+		ResultSet rs = null;
+		try {
+			rs = conn.createStatement().executeQuery("select count(*) from " + fullTableName);
+			int rowsCount = 0;
+			while (rs.next()) {
+				rowsCount = rs.getInt(1);
+			}
+			return rowsCount;
+
+		} finally {
+			if (rs != null) {
+				rs.close();
+			}
+			if (conn != null) {
+				conn.close();
+			}
+		}
+
+	}
+
+	private void dropTable(final String fullTableName) throws SQLException {
+		Preconditions.checkNotNull(fullTableName);
+		Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+		final Connection conn = DriverManager.getConnection(getUrl(), props);
+		try {
+			conn.createStatement().execute("drop table if exists " + fullTableName);
+		} finally {
+			if (conn != null) {
+				conn.close();
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/564e9b83/phoenix-flume/src/main/java/org/apache/phoenix/flume/FlumeConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-flume/src/main/java/org/apache/phoenix/flume/FlumeConstants.java b/phoenix-flume/src/main/java/org/apache/phoenix/flume/FlumeConstants.java
index f60d7dc..a146bbe 100644
--- a/phoenix-flume/src/main/java/org/apache/phoenix/flume/FlumeConstants.java
+++ b/phoenix-flume/src/main/java/org/apache/phoenix/flume/FlumeConstants.java
@@ -62,12 +62,22 @@ public final class FlumeConstants {
     /** Whether to ignore case when performing regex matches. */
     public static final String IGNORE_CASE_CONFIG = "regexIgnoreCase";
     public static final boolean IGNORE_CASE_DEFAULT = false;
-    
+
     /** JSON expression used to parse groups from event data. */
     public static final String CONFIG_COLUMNS_MAPPING = "columnsMapping";
     public static final String CONFIG_PARTIAL_SCHEMA = "partialSchema";
     public static final String JSON_DEFAULT = "{}";
-    
+
+    /** CSV expression used to parse groups from event data. */
+    public static final String CSV_DELIMITER = "csvDelimiter";
+    public static final String CSV_DELIMITER_DEFAULT = ",";
+    public static final String CSV_QUOTE = "csvQuote";
+    public static final String CSV_QUOTE_DEFAULT = "\"";
+    public static final String CSV_ESCAPE = "csvEscape";
+    public static final String CSV_ESCAPE_DEFAULT = "\\";
+    public static final String CSV_ARRAY_DELIMITER = "csvArrayDelimiter";
+    public static final String CSV_ARRAY_DELIMITER_DEFAULT = ",";
+
     /** Comma separated list of column names . */
     public static final String CONFIG_COLUMN_NAMES = "columns";
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/564e9b83/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java
----------------------------------------------------------------------
diff --git a/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java b/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java
new file mode 100644
index 0000000..1521084
--- /dev/null
+++ b/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.flume.serializer;
+
+import static org.apache.phoenix.flume.FlumeConstants.CSV_DELIMITER;
+import static org.apache.phoenix.flume.FlumeConstants.CSV_DELIMITER_DEFAULT;
+import static org.apache.phoenix.flume.FlumeConstants.CSV_QUOTE;
+import static org.apache.phoenix.flume.FlumeConstants.CSV_QUOTE_DEFAULT;
+import static org.apache.phoenix.flume.FlumeConstants.CSV_ESCAPE;
+import static org.apache.phoenix.flume.FlumeConstants.CSV_ESCAPE_DEFAULT;
+import static org.apache.phoenix.flume.FlumeConstants.CSV_ARRAY_DELIMITER;
+import static org.apache.phoenix.flume.FlumeConstants.CSV_ARRAY_DELIMITER_DEFAULT;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.sql.Array;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.phoenix.schema.types.PDataType;
+import org.json.JSONArray;
+import org.json.JSONTokener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+
+public class CsvEventSerializer extends BaseEventSerializer {
+
+	private static final Logger logger = LoggerFactory.getLogger(CsvEventSerializer.class);
+
+	private String csvDelimiter;
+	private String csvQuote;
+	private String csvEscape;
+	private String csvArrayDelimiter;
+	private CsvLineParser csvLineParser;
+
+	/**
+     * 
+     */
+	@Override
+	public void doConfigure(Context context) {
+		csvDelimiter = context.getString(CSV_DELIMITER, CSV_DELIMITER_DEFAULT);
+		csvQuote = context.getString(CSV_QUOTE, CSV_QUOTE_DEFAULT);
+		csvEscape = context.getString(CSV_ESCAPE, CSV_ESCAPE_DEFAULT);
+		csvArrayDelimiter = context.getString(CSV_ARRAY_DELIMITER, CSV_ARRAY_DELIMITER_DEFAULT);
+		csvLineParser = new CsvLineParser(csvDelimiter.toCharArray()[0], csvQuote.toCharArray()[0],
+				csvEscape.toCharArray()[0]);
+	}
+
+	/**
+     * 
+     */
+	@Override
+	public void doInitialize() throws SQLException {
+		// NO-OP
+	}
+
+	@Override
+	public void upsertEvents(List<Event> events) throws SQLException {
+		Preconditions.checkNotNull(events);
+		Preconditions.checkNotNull(connection);
+		Preconditions.checkNotNull(this.upsertStatement);
+
+		boolean wasAutoCommit = connection.getAutoCommit();
+		connection.setAutoCommit(false);
+		try (PreparedStatement colUpsert = connection.prepareStatement(upsertStatement)) {
+			String value = null;
+			Integer sqlType = null;
+			for (Event event : events) {
+				byte[] payloadBytes = event.getBody();
+				if (payloadBytes == null || payloadBytes.length == 0) {
+					continue;
+				}
+				String payload = new String(payloadBytes);
+				CSVRecord csvRecord = csvLineParser.parse(payload);
+				if (colNames.size() != csvRecord.size()) {
+					logger.debug("payload data {} doesn't match the fields mapping {} ", payload, colNames);
+					continue;
+				}
+				Map<String, String> data = new HashMap<String, String>();
+				for (int i = 0; i < csvRecord.size(); i++) {
+					data.put(colNames.get(i), csvRecord.get(i));
+				}
+				Collection<String> values = data.values();
+				if (values.contains(null)) {
+					logger.debug("payload data {} doesn't match the fields mapping {} ", payload, colNames);
+					continue;
+				}
+
+				int index = 1;
+				int offset = 0;
+				for (int i = 0; i < colNames.size(); i++, offset++) {
+					if (columnMetadata[offset] == null) {
+						continue;
+					}
+					String colName = colNames.get(i);
+					value = data.get(colName);
+					sqlType = columnMetadata[offset].getSqlType();
+					PDataType pDataType = PDataType.fromTypeId(sqlType);
+					Object upsertValue;
+					if (pDataType.isArrayType()) {
+						String arrayJson = Arrays.toString(value.split(csvArrayDelimiter));
+						JSONArray jsonArray = new JSONArray(new JSONTokener(arrayJson));
+						Object[] vals = new Object[jsonArray.length()];
+						for (int x = 0; x < jsonArray.length(); x++) {
+							vals[x] = jsonArray.get(x);
+						}
+						String baseTypeSqlName = PDataType.arrayBaseType(pDataType).getSqlTypeName();
+						Array array = connection.createArrayOf(baseTypeSqlName, vals);
+						upsertValue = pDataType.toObject(array, pDataType);
+					} else {
+						upsertValue = pDataType.toObject(value);
+					}
+					if (upsertValue != null) {
+						colUpsert.setObject(index++, upsertValue, sqlType);
+					} else {
+						colUpsert.setNull(index++, sqlType);
+					}
+				}
+
+				// add headers if necessary
+				Map<String, String> headerValues = event.getHeaders();
+				for (int i = 0; i < headers.size(); i++, offset++) {
+					String headerName = headers.get(i);
+					String headerValue = headerValues.get(headerName);
+					sqlType = columnMetadata[offset].getSqlType();
+					Object upsertValue = PDataType.fromTypeId(sqlType).toObject(headerValue);
+					if (upsertValue != null) {
+						colUpsert.setObject(index++, upsertValue, sqlType);
+					} else {
+						colUpsert.setNull(index++, sqlType);
+					}
+				}
+
+				if (autoGenerateKey) {
+					sqlType = columnMetadata[offset].getSqlType();
+					String generatedRowValue = this.keyGenerator.generate();
+					Object rowkeyValue = PDataType.fromTypeId(sqlType).toObject(generatedRowValue);
+					colUpsert.setObject(index++, rowkeyValue, sqlType);
+				}
+				colUpsert.execute();
+			}
+			connection.commit();
+		} catch (Exception ex) {
+			logger.error("An error {} occurred during persisting the event ", ex.getMessage());
+			throw new SQLException(ex.getMessage());
+		} finally {
+			if (wasAutoCommit) {
+				connection.setAutoCommit(true);
+			}
+		}
+
+	}
+
+	static class CsvLineParser {
+		private final CSVFormat csvFormat;
+
+		CsvLineParser(char fieldDelimiter, char quote, char escape) {
+			this.csvFormat = CSVFormat.DEFAULT.withIgnoreEmptyLines(true).withDelimiter(fieldDelimiter)
+					.withEscape(escape).withQuote(quote);
+		}
+
+		public CSVRecord parse(String input) throws IOException {
+			CSVParser csvParser = new CSVParser(new StringReader(input), this.csvFormat);
+			return ((CSVRecord) Iterables.getFirst(csvParser, null));
+		}
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/564e9b83/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializers.java
----------------------------------------------------------------------
diff --git a/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializers.java b/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializers.java
index 68a609b..8c99d7d 100644
--- a/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializers.java
+++ b/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializers.java
@@ -19,7 +19,7 @@ package org.apache.phoenix.flume.serializer;
 
 public enum EventSerializers {
 
-    REGEX(RegexEventSerializer.class.getName()), JSON(JsonEventSerializer.class.getName());
+    REGEX(RegexEventSerializer.class.getName()), JSON(JsonEventSerializer.class.getName()), CSV(CsvEventSerializer.class.getName());
     
     private final String className;
     
@@ -33,4 +33,4 @@ public enum EventSerializers {
     public String getClassName() {
         return className;
     }
-}
+}
\ No newline at end of file