You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by br...@apache.org on 2013/06/03 02:02:53 UTC
git commit: FLUME-2063: Add Configurable charset to
RegexHbaseEventSerializer
Updated Branches:
refs/heads/trunk eefefa941 -> d63c378b5
FLUME-2063: Add Configurable charset to RegexHbaseEventSerializer
(Roman Shaposhnik via Brock Noland)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/d63c378b
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/d63c378b
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/d63c378b
Branch: refs/heads/trunk
Commit: d63c378b551bba338677d50a36d254dd8c62fe4a
Parents: eefefa9
Author: Brock Noland <br...@apache.org>
Authored: Sun Jun 2 17:02:19 2013 -0700
Committer: Brock Noland <br...@apache.org>
Committed: Sun Jun 2 17:02:19 2013 -0700
----------------------------------------------------------------------
.../sink/hbase/RegexHbaseEventSerializer.java | 18 ++++++++++----
.../sink/hbase/TestRegexHbaseEventSerializer.java | 18 ++++++++++-----
2 files changed, 25 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/d63c378b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java
index 0df559d..27974d9 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java
@@ -18,6 +18,7 @@
*/
package org.apache.flume.sink.hbase;
+import java.nio.charset.Charset;
import java.util.Calendar;
import java.util.List;
import java.util.Map;
@@ -67,6 +68,10 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer {
/** Whether to deposit event headers into corresponding column qualifiers */
public static final String DEPOSIT_HEADERS_CONFIG = "depositHeaders";
public static final boolean DEPOSIT_HEADERS_DEFAULT = false;
+
+ /** What charset to use when serializing into HBase's byte arrays */
+ public static final String CHARSET_CONFIG = "charset";
+ public static final String CHARSET_DEFAULT = "UTF-8";
/* This is a nonce used in HBase row-keys, such that the same row-key
* never gets written more than once from within this JVM. */
@@ -80,6 +85,7 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer {
private boolean regexIgnoreCase;
private boolean depositHeaders;
private Pattern inputPattern;
+ private Charset charset;
@Override
public void configure(Context context) {
@@ -90,11 +96,13 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer {
DEPOSIT_HEADERS_DEFAULT);
inputPattern = Pattern.compile(regex, Pattern.DOTALL
+ (regexIgnoreCase ? Pattern.CASE_INSENSITIVE : 0));
+ charset = Charset.forName(context.getString(CHARSET_CONFIG,
+ CHARSET_DEFAULT));
String colNameStr = context.getString(COL_NAME_CONFIG, COLUMN_NAME_DEFAULT);
String[] columnNames = colNameStr.split(",");
for (String s: columnNames) {
- colNames.add(s.getBytes(Charsets.UTF_8));
+ colNames.add(s.getBytes(charset));
}
}
@@ -131,7 +139,7 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer {
* data loss. */
String rowKey = String.format("%s-%s-%s", cal.getTimeInMillis(),
randomKey, nonce.getAndIncrement());
- return rowKey.getBytes(Charsets.UTF_8);
+ return rowKey.getBytes(charset);
}
protected byte[] getRowKey() {
@@ -142,7 +150,7 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer {
public List<Row> getActions() throws FlumeException {
List<Row> actions = Lists.newArrayList();
byte[] rowKey;
- Matcher m = inputPattern.matcher(new String(payload));
+ Matcher m = inputPattern.matcher(new String(payload, charset));
if (!m.matches()) {
return Lists.newArrayList();
}
@@ -156,11 +164,11 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer {
Put put = new Put(rowKey);
for (int i = 0; i < colNames.size(); i++) {
- put.add(cf, colNames.get(i), m.group(i + 1).getBytes(Charsets.UTF_8));
+ put.add(cf, colNames.get(i), m.group(i + 1).getBytes(charset));
}
if (depositHeaders) {
for (Map.Entry<String, String> entry : headers.entrySet()) {
- put.add(cf, entry.getKey().getBytes(Charsets.UTF_8), entry.getValue().getBytes(Charsets.UTF_8));
+ put.add(cf, entry.getKey().getBytes(charset), entry.getValue().getBytes(charset));
}
}
actions.add(put);
http://git-wip-us.apache.org/repos/asf/flume/blob/d63c378b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java
index 6cec36f..191dc54 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java
@@ -18,11 +18,13 @@
*/
package org.apache.flume.sink.hbase;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.nio.charset.Charset;
import java.util.Calendar;
import java.util.List;
import java.util.Map;
@@ -162,16 +164,19 @@ public class TestRegexHbaseEventSerializer {
@Test
/** Test depositing of the header information. */
public void testDepositHeaders() throws Exception {
+ Charset charset = Charset.forName("KOI8-R");
RegexHbaseEventSerializer s = new RegexHbaseEventSerializer();
Context context = new Context();
context.put(RegexHbaseEventSerializer.DEPOSIT_HEADERS_CONFIG,
"true");
+ context.put(RegexHbaseEventSerializer.CHARSET_CONFIG,
+ charset.toString());
s.configure(context);
String body = "body";
Map<String, String> headers = Maps.newHashMap();
headers.put("header1", "value1");
- headers.put("header2", "value2");
+ headers.put("заголовок2", "значение2");
Event e = EventBuilder.withBody(Bytes.toBytes(body), headers);
s.initialize(e, "CF".getBytes());
@@ -184,14 +189,15 @@ public class TestRegexHbaseEventSerializer {
List<KeyValue> kvPairs = put.getFamilyMap().get(s.cf);
assertTrue(kvPairs.size() == 3);
- Map<String, String> resultMap = Maps.newHashMap();
+ Map<String, byte[]> resultMap = Maps.newHashMap();
for (KeyValue kv : kvPairs) {
- resultMap.put(new String(kv.getQualifier()), new String(kv.getValue()));
+ resultMap.put(new String(kv.getQualifier(), charset), kv.getValue());
}
- assertEquals(body, resultMap.get("payload"));
- assertEquals("value1", resultMap.get("header1"));
- assertEquals("value2", resultMap.get("header2"));
+ assertEquals(body, new String(resultMap.get(RegexHbaseEventSerializer.COLUMN_NAME_DEFAULT), charset));
+ assertEquals("value1", new String(resultMap.get("header1"), charset));
+ assertArrayEquals("значение2".getBytes(charset), resultMap.get("заголовок2"));
+ assertEquals("значение2".length(), resultMap.get("заголовок2").length);
List<Increment> increments = s.getIncrements();
assertEquals(0, increments.size());