You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by jm...@apache.org on 2017/02/15 14:17:20 UTC
[2/2] phoenix git commit: PHOENIX-3135 Support loading csv data using
Flume plugin (Kalyan Hadoop)
PHOENIX-3135 Support loading csv data using Flume plugin (Kalyan Hadoop)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/564e9b83
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/564e9b83
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/564e9b83
Branch: refs/heads/master
Commit: 564e9b83502e50a30ca0c5eefc6daf6ce8bad065
Parents: 18a86b6
Author: Josh Mahonin <jm...@gmail.com>
Authored: Wed Feb 15 09:12:08 2017 -0500
Committer: Josh Mahonin <jm...@gmail.com>
Committed: Wed Feb 15 09:16:54 2017 -0500
----------------------------------------------------------------------
phoenix-flume/pom.xml | 6 +-
.../phoenix/flume/CsvEventSerializerIT.java | 416 +++++++++++++++++++
.../apache/phoenix/flume/FlumeConstants.java | 14 +-
.../flume/serializer/CsvEventSerializer.java | 196 +++++++++
.../flume/serializer/EventSerializers.java | 4 +-
5 files changed, 631 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/564e9b83/phoenix-flume/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-flume/pom.xml b/phoenix-flume/pom.xml
index 5c4f197..279806f 100644
--- a/phoenix-flume/pom.xml
+++ b/phoenix-flume/pom.xml
@@ -178,7 +178,11 @@
<artifactId>json-path</artifactId>
<version>2.2.0</version>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-csv</artifactId>
+ <version>${commons-csv.version}</version>
+ </dependency>
<!-- Main dependency on flume. The last to avoid using old commons-io in IT -->
<dependency>
<groupId>org.apache.flume</groupId>
http://git-wip-us.apache.org/repos/asf/phoenix/blob/564e9b83/phoenix-flume/src/it/java/org/apache/phoenix/flume/CsvEventSerializerIT.java
----------------------------------------------------------------------
diff --git a/phoenix-flume/src/it/java/org/apache/phoenix/flume/CsvEventSerializerIT.java b/phoenix-flume/src/it/java/org/apache/phoenix/flume/CsvEventSerializerIT.java
new file mode 100644
index 0000000..842db04
--- /dev/null
+++ b/phoenix-flume/src/it/java/org/apache/phoenix/flume/CsvEventSerializerIT.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.flume;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.flume.serializer.EventSerializers;
+import org.apache.phoenix.flume.sink.PhoenixSink;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.junit.Test;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class CsvEventSerializerIT extends BaseHBaseManagedTimeIT {
+
+ private Context sinkContext;
+ private PhoenixSink sink;
+
+ @Test
+ public void testWithDefaultDelimiters() throws EventDeliveryException, SQLException {
+
+ final String fullTableName = "FLUME_CSV_TEST";
+
+ String ddl = "CREATE TABLE IF NOT EXISTS " + fullTableName
+ + " (flume_time timestamp not null, col1 varchar , col2 double, col3 varchar[], col4 integer[]"
+ + " CONSTRAINT pk PRIMARY KEY (flume_time))\n";
+ String columns = "col1,col2,col3,col4";
+ String rowkeyType = DefaultKeyGenerator.TIMESTAMP.name();
+ initSinkContext(fullTableName, ddl, columns, null, null, null, null, rowkeyType, null);
+
+ sink = new PhoenixSink();
+ Configurables.configure(sink, sinkContext);
+
+ assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
+
+ final Channel channel = this.initChannel();
+ sink.setChannel(channel);
+
+ sink.start();
+
+ final String eventBody = "kalyan,10.5,\"abc,pqr,xyz\",\"1,2,3,4\"";
+ final Event event = EventBuilder.withBody(Bytes.toBytes(eventBody));
+ // put event in channel
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+ channel.put(event);
+ transaction.commit();
+ transaction.close();
+
+ sink.process();
+
+ int rowsInDb = countRows(fullTableName);
+ assertEquals(1, rowsInDb);
+
+ sink.stop();
+ assertEquals(LifecycleState.STOP, sink.getLifecycleState());
+
+ dropTable(fullTableName);
+ }
+
+ @Test
+ public void testKeyGenerator() throws EventDeliveryException, SQLException {
+
+ final String fullTableName = "FLUME_CSV_TEST";
+ initSinkContextWithDefaults(fullTableName);
+
+ sink = new PhoenixSink();
+ Configurables.configure(sink, sinkContext);
+
+ assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
+
+ final Channel channel = this.initChannel();
+ sink.setChannel(channel);
+
+ sink.start();
+ final String eventBody = "kalyan,10.5,\"abc,pqr,xyz\",\"1,2,3,4\"";
+ final Event event = EventBuilder.withBody(Bytes.toBytes(eventBody));
+ // put event in channel
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+ channel.put(event);
+ transaction.commit();
+ transaction.close();
+
+ sink.process();
+
+ int rowsInDb = countRows(fullTableName);
+ assertEquals(1, rowsInDb);
+
+ sink.stop();
+ assertEquals(LifecycleState.STOP, sink.getLifecycleState());
+
+ dropTable(fullTableName);
+ }
+
+ @Test
+ public void testMismatchKeyGenerator() throws EventDeliveryException, SQLException {
+
+ final String fullTableName = "FLUME_CSV_TEST";
+ initSinkContextWithDefaults(fullTableName);
+ setConfig(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,
+ DefaultKeyGenerator.UUID.name());
+
+ sink = new PhoenixSink();
+ Configurables.configure(sink, sinkContext);
+ assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
+
+ final Channel channel = this.initChannel();
+ sink.setChannel(channel);
+
+ sink.start();
+ final String eventBody = "kalyan,10.5,\"abc,pqr,xyz\",\"1,2,3,4\"";
+ final Event event = EventBuilder.withBody(Bytes.toBytes(eventBody));
+ // put event in channel
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+ channel.put(event);
+ transaction.commit();
+ transaction.close();
+
+ try {
+ sink.process();
+ fail();
+ } catch (Exception ex) {
+ assertTrue(ex.getCause().getMessage().contains("java.lang.IllegalArgumentException: Invalid format:"));
+ }
+
+ dropTable(fullTableName);
+ }
+
+ @Test
+ public void testMissingColumnsInEvent() throws EventDeliveryException, SQLException {
+
+ final String fullTableName = "FLUME_CSV_TEST";
+ initSinkContextWithDefaults(fullTableName);
+
+ sink = new PhoenixSink();
+ Configurables.configure(sink, sinkContext);
+ assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
+
+ final Channel channel = this.initChannel();
+ sink.setChannel(channel);
+
+ sink.start();
+ final String eventBody = "kalyan,\"abc,pqr,xyz\",\"1,2,3,4\"";
+ final Event event = EventBuilder.withBody(Bytes.toBytes(eventBody));
+ // put event in channel
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+ channel.put(event);
+ transaction.commit();
+ transaction.close();
+
+ sink.process();
+
+ int rowsInDb = countRows(fullTableName);
+ assertEquals(0, rowsInDb);
+
+ sink.stop();
+ assertEquals(LifecycleState.STOP, sink.getLifecycleState());
+
+ dropTable(fullTableName);
+ }
+
+ @Test
+ public void testBatchEvents() throws EventDeliveryException, SQLException {
+
+ final String fullTableName = "FLUME_CSV_TEST";
+ initSinkContextWithDefaults(fullTableName);
+
+ sink = new PhoenixSink();
+ Configurables.configure(sink, sinkContext);
+ assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
+
+ final Channel channel = this.initChannel();
+ sink.setChannel(channel);
+
+ sink.start();
+ int numEvents = 150;
+ String col1 = "val1";
+ String a1 = "\"aaa,bbb,ccc\"";
+ String a2 = "\"1,2,3,4\"";
+ String eventBody = null;
+ List<Event> eventList = Lists.newArrayListWithCapacity(numEvents);
+ for (int i = 0; i < eventList.size(); i++) {
+ eventBody = (col1 + i) + "," + i * 10.5 + "," + a1 + "," + a2;
+ Event event = EventBuilder.withBody(Bytes.toBytes(eventBody));
+ eventList.add(event);
+ }
+
+ // put event in channel
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+ for (Event event : eventList) {
+ channel.put(event);
+ }
+ transaction.commit();
+ transaction.close();
+
+ sink.process();
+
+ int rowsInDb = countRows(fullTableName);
+ assertEquals(eventList.size(), rowsInDb);
+
+ sink.stop();
+ assertEquals(LifecycleState.STOP, sink.getLifecycleState());
+
+ dropTable(fullTableName);
+ }
+
+ @Test
+ public void testEventsWithHeaders() throws Exception {
+
+ sinkContext = new Context();
+ final String fullTableName = "FLUME_CSV_TEST";
+ final String ddl = "CREATE TABLE IF NOT EXISTS "
+ + fullTableName
+ + " (rowkey VARCHAR not null, col1 varchar , col2 double, col3 varchar[], col4 integer[], host varchar , source varchar \n"
+ + " CONSTRAINT pk PRIMARY KEY (rowkey))\n";
+ String columns = "col1,col2,col3,col4";
+ String rowkeyType = DefaultKeyGenerator.UUID.name();
+ String headers = "host,source";
+ initSinkContext(fullTableName, ddl, columns, null, null, null, null, rowkeyType, headers);
+
+ sink = new PhoenixSink();
+ Configurables.configure(sink, sinkContext);
+ assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
+
+ final Channel channel = this.initChannel();
+ sink.setChannel(channel);
+
+ sink.start();
+
+ int numEvents = 10;
+ String col1 = "val1";
+ String a1 = "\"aaa,bbb,ccc\"";
+ String a2 = "\"1,2,3,4\"";
+ String hostHeader = "host1";
+ String sourceHeader = "source1";
+ String eventBody = null;
+ List<Event> eventList = Lists.newArrayListWithCapacity(numEvents);
+ for (int i = 0; i < numEvents; i++) {
+ eventBody = (col1 + i) + "," + i * 10.5 + "," + a1 + "," + a2;
+ Map<String, String> headerMap = Maps.newHashMapWithExpectedSize(2);
+ headerMap.put("host", hostHeader);
+ headerMap.put("source", sourceHeader);
+ Event event = EventBuilder.withBody(Bytes.toBytes(eventBody), headerMap);
+ eventList.add(event);
+ }
+
+ // put event in channel
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+ for (Event event : eventList) {
+ channel.put(event);
+ }
+ transaction.commit();
+ transaction.close();
+
+ sink.process();
+
+ final String query = " SELECT * FROM \n " + fullTableName;
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ final ResultSet rs;
+ final Connection conn = DriverManager.getConnection(getUrl(), props);
+ try {
+ rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals("host1", rs.getString("host"));
+ assertEquals("source1", rs.getString("source"));
+
+ assertTrue(rs.next());
+ assertEquals("host1", rs.getString("host"));
+ assertEquals("source1", rs.getString("source"));
+ } finally {
+ if (conn != null) {
+ conn.close();
+ }
+ }
+ sink.stop();
+ assertEquals(LifecycleState.STOP, sink.getLifecycleState());
+
+ dropTable(fullTableName);
+ }
+
+ private Channel initChannel() {
+ // Channel configuration
+ Context channelContext = new Context();
+ channelContext.put("capacity", "10000");
+ channelContext.put("transactionCapacity", "200");
+
+ Channel channel = new MemoryChannel();
+ channel.setName("memorychannel");
+ Configurables.configure(channel, channelContext);
+ return channel;
+ }
+
+ private void initSinkContext(final String fullTableName, final String ddl, final String columns,
+ final String csvDelimiter, final String csvQuote, final String csvEscape, final String csvArrayDelimiter,
+ final String rowkeyType, final String headers) {
+ Preconditions.checkNotNull(fullTableName);
+ sinkContext = new Context();
+ sinkContext.put(FlumeConstants.CONFIG_TABLE, fullTableName);
+ sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl());
+ sinkContext.put(FlumeConstants.CONFIG_SERIALIZER, EventSerializers.CSV.name());
+ sinkContext.put(FlumeConstants.CONFIG_TABLE_DDL, ddl);
+ sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_COLUMN_NAMES, columns);
+ if (null != csvDelimiter)
+ sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CSV_DELIMITER, csvDelimiter);
+ if (null != csvQuote)
+ sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CSV_QUOTE, csvQuote);
+ if (null != csvEscape)
+ sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CSV_ESCAPE, csvEscape);
+ if (null != csvArrayDelimiter)
+ sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CSV_ARRAY_DELIMITER,
+ csvArrayDelimiter);
+ if (null != rowkeyType)
+ sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,
+ rowkeyType);
+ if (null != headers)
+ sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_HEADER_NAMES, headers);
+ }
+
+ private void initSinkContextWithDefaults(final String fullTableName) {
+ String ddl = "CREATE TABLE IF NOT EXISTS " + fullTableName
+ + " (flume_time timestamp not null, col1 varchar , col2 double, col3 varchar[], col4 integer[]"
+ + " CONSTRAINT pk PRIMARY KEY (flume_time))\n";
+ String columns = "col1,col2,col3,col4";
+ String rowkeyType = DefaultKeyGenerator.TIMESTAMP.name();
+ initSinkContext(fullTableName, ddl, columns, null, null, null, null, rowkeyType, null);
+ }
+
+ private void setConfig(final String configName, final String configValue) {
+ Preconditions.checkNotNull(sinkContext);
+ Preconditions.checkNotNull(configName);
+ Preconditions.checkNotNull(configValue);
+ sinkContext.put(configName, configValue);
+ }
+
+ private int countRows(final String fullTableName) throws SQLException {
+ Preconditions.checkNotNull(fullTableName);
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ final Connection conn = DriverManager.getConnection(getUrl(), props);
+ ResultSet rs = null;
+ try {
+ rs = conn.createStatement().executeQuery("select count(*) from " + fullTableName);
+ int rowsCount = 0;
+ while (rs.next()) {
+ rowsCount = rs.getInt(1);
+ }
+ return rowsCount;
+
+ } finally {
+ if (rs != null) {
+ rs.close();
+ }
+ if (conn != null) {
+ conn.close();
+ }
+ }
+
+ }
+
+ private void dropTable(final String fullTableName) throws SQLException {
+ Preconditions.checkNotNull(fullTableName);
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ final Connection conn = DriverManager.getConnection(getUrl(), props);
+ try {
+ conn.createStatement().execute("drop table if exists " + fullTableName);
+ } finally {
+ if (conn != null) {
+ conn.close();
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/564e9b83/phoenix-flume/src/main/java/org/apache/phoenix/flume/FlumeConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-flume/src/main/java/org/apache/phoenix/flume/FlumeConstants.java b/phoenix-flume/src/main/java/org/apache/phoenix/flume/FlumeConstants.java
index f60d7dc..a146bbe 100644
--- a/phoenix-flume/src/main/java/org/apache/phoenix/flume/FlumeConstants.java
+++ b/phoenix-flume/src/main/java/org/apache/phoenix/flume/FlumeConstants.java
@@ -62,12 +62,22 @@ public final class FlumeConstants {
/** Whether to ignore case when performing regex matches. */
public static final String IGNORE_CASE_CONFIG = "regexIgnoreCase";
public static final boolean IGNORE_CASE_DEFAULT = false;
-
+
/** JSON expression used to parse groups from event data. */
public static final String CONFIG_COLUMNS_MAPPING = "columnsMapping";
public static final String CONFIG_PARTIAL_SCHEMA = "partialSchema";
public static final String JSON_DEFAULT = "{}";
-
+
+ /** CSV expression used to parse groups from event data. */
+ public static final String CSV_DELIMITER = "csvDelimiter";
+ public static final String CSV_DELIMITER_DEFAULT = ",";
+ public static final String CSV_QUOTE = "csvQuote";
+ public static final String CSV_QUOTE_DEFAULT = "\"";
+ public static final String CSV_ESCAPE = "csvEscape";
+ public static final String CSV_ESCAPE_DEFAULT = "\\";
+ public static final String CSV_ARRAY_DELIMITER = "csvArrayDelimiter";
+ public static final String CSV_ARRAY_DELIMITER_DEFAULT = ",";
+
/** Comma separated list of column names . */
public static final String CONFIG_COLUMN_NAMES = "columns";
http://git-wip-us.apache.org/repos/asf/phoenix/blob/564e9b83/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java
----------------------------------------------------------------------
diff --git a/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java b/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java
new file mode 100644
index 0000000..1521084
--- /dev/null
+++ b/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.flume.serializer;
+
+import static org.apache.phoenix.flume.FlumeConstants.CSV_DELIMITER;
+import static org.apache.phoenix.flume.FlumeConstants.CSV_DELIMITER_DEFAULT;
+import static org.apache.phoenix.flume.FlumeConstants.CSV_QUOTE;
+import static org.apache.phoenix.flume.FlumeConstants.CSV_QUOTE_DEFAULT;
+import static org.apache.phoenix.flume.FlumeConstants.CSV_ESCAPE;
+import static org.apache.phoenix.flume.FlumeConstants.CSV_ESCAPE_DEFAULT;
+import static org.apache.phoenix.flume.FlumeConstants.CSV_ARRAY_DELIMITER;
+import static org.apache.phoenix.flume.FlumeConstants.CSV_ARRAY_DELIMITER_DEFAULT;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.sql.Array;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.phoenix.schema.types.PDataType;
+import org.json.JSONArray;
+import org.json.JSONTokener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+
+public class CsvEventSerializer extends BaseEventSerializer {
+
+ private static final Logger logger = LoggerFactory.getLogger(CsvEventSerializer.class);
+
+ private String csvDelimiter;
+ private String csvQuote;
+ private String csvEscape;
+ private String csvArrayDelimiter;
+ private CsvLineParser csvLineParser;
+
+ /**
+ *
+ */
+ @Override
+ public void doConfigure(Context context) {
+ csvDelimiter = context.getString(CSV_DELIMITER, CSV_DELIMITER_DEFAULT);
+ csvQuote = context.getString(CSV_QUOTE, CSV_QUOTE_DEFAULT);
+ csvEscape = context.getString(CSV_ESCAPE, CSV_ESCAPE_DEFAULT);
+ csvArrayDelimiter = context.getString(CSV_ARRAY_DELIMITER, CSV_ARRAY_DELIMITER_DEFAULT);
+ csvLineParser = new CsvLineParser(csvDelimiter.toCharArray()[0], csvQuote.toCharArray()[0],
+ csvEscape.toCharArray()[0]);
+ }
+
+ /**
+ *
+ */
+ @Override
+ public void doInitialize() throws SQLException {
+ // NO-OP
+ }
+
+ @Override
+ public void upsertEvents(List<Event> events) throws SQLException {
+ Preconditions.checkNotNull(events);
+ Preconditions.checkNotNull(connection);
+ Preconditions.checkNotNull(this.upsertStatement);
+
+ boolean wasAutoCommit = connection.getAutoCommit();
+ connection.setAutoCommit(false);
+ try (PreparedStatement colUpsert = connection.prepareStatement(upsertStatement)) {
+ String value = null;
+ Integer sqlType = null;
+ for (Event event : events) {
+ byte[] payloadBytes = event.getBody();
+ if (payloadBytes == null || payloadBytes.length == 0) {
+ continue;
+ }
+ String payload = new String(payloadBytes);
+ CSVRecord csvRecord = csvLineParser.parse(payload);
+ if (colNames.size() != csvRecord.size()) {
+ logger.debug("payload data {} doesn't match the fields mapping {} ", payload, colNames);
+ continue;
+ }
+ Map<String, String> data = new HashMap<String, String>();
+ for (int i = 0; i < csvRecord.size(); i++) {
+ data.put(colNames.get(i), csvRecord.get(i));
+ }
+ Collection<String> values = data.values();
+ if (values.contains(null)) {
+ logger.debug("payload data {} doesn't match the fields mapping {} ", payload, colNames);
+ continue;
+ }
+
+ int index = 1;
+ int offset = 0;
+ for (int i = 0; i < colNames.size(); i++, offset++) {
+ if (columnMetadata[offset] == null) {
+ continue;
+ }
+ String colName = colNames.get(i);
+ value = data.get(colName);
+ sqlType = columnMetadata[offset].getSqlType();
+ PDataType pDataType = PDataType.fromTypeId(sqlType);
+ Object upsertValue;
+ if (pDataType.isArrayType()) {
+ String arrayJson = Arrays.toString(value.split(csvArrayDelimiter));
+ JSONArray jsonArray = new JSONArray(new JSONTokener(arrayJson));
+ Object[] vals = new Object[jsonArray.length()];
+ for (int x = 0; x < jsonArray.length(); x++) {
+ vals[x] = jsonArray.get(x);
+ }
+ String baseTypeSqlName = PDataType.arrayBaseType(pDataType).getSqlTypeName();
+ Array array = connection.createArrayOf(baseTypeSqlName, vals);
+ upsertValue = pDataType.toObject(array, pDataType);
+ } else {
+ upsertValue = pDataType.toObject(value);
+ }
+ if (upsertValue != null) {
+ colUpsert.setObject(index++, upsertValue, sqlType);
+ } else {
+ colUpsert.setNull(index++, sqlType);
+ }
+ }
+
+ // add headers if necessary
+ Map<String, String> headerValues = event.getHeaders();
+ for (int i = 0; i < headers.size(); i++, offset++) {
+ String headerName = headers.get(i);
+ String headerValue = headerValues.get(headerName);
+ sqlType = columnMetadata[offset].getSqlType();
+ Object upsertValue = PDataType.fromTypeId(sqlType).toObject(headerValue);
+ if (upsertValue != null) {
+ colUpsert.setObject(index++, upsertValue, sqlType);
+ } else {
+ colUpsert.setNull(index++, sqlType);
+ }
+ }
+
+ if (autoGenerateKey) {
+ sqlType = columnMetadata[offset].getSqlType();
+ String generatedRowValue = this.keyGenerator.generate();
+ Object rowkeyValue = PDataType.fromTypeId(sqlType).toObject(generatedRowValue);
+ colUpsert.setObject(index++, rowkeyValue, sqlType);
+ }
+ colUpsert.execute();
+ }
+ connection.commit();
+ } catch (Exception ex) {
+ logger.error("An error {} occurred during persisting the event ", ex.getMessage());
+ throw new SQLException(ex.getMessage());
+ } finally {
+ if (wasAutoCommit) {
+ connection.setAutoCommit(true);
+ }
+ }
+
+ }
+
+ static class CsvLineParser {
+ private final CSVFormat csvFormat;
+
+ CsvLineParser(char fieldDelimiter, char quote, char escape) {
+ this.csvFormat = CSVFormat.DEFAULT.withIgnoreEmptyLines(true).withDelimiter(fieldDelimiter)
+ .withEscape(escape).withQuote(quote);
+ }
+
+ public CSVRecord parse(String input) throws IOException {
+ CSVParser csvParser = new CSVParser(new StringReader(input), this.csvFormat);
+ return ((CSVRecord) Iterables.getFirst(csvParser, null));
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/564e9b83/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializers.java
----------------------------------------------------------------------
diff --git a/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializers.java b/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializers.java
index 68a609b..8c99d7d 100644
--- a/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializers.java
+++ b/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializers.java
@@ -19,7 +19,7 @@ package org.apache.phoenix.flume.serializer;
public enum EventSerializers {
- REGEX(RegexEventSerializer.class.getName()), JSON(JsonEventSerializer.class.getName());
+ REGEX(RegexEventSerializer.class.getName()), JSON(JsonEventSerializer.class.getName()), CSV(CsvEventSerializer.class.getName());
private final String className;
@@ -33,4 +33,4 @@ public enum EventSerializers {
public String getClassName() {
return className;
}
-}
+}
\ No newline at end of file