You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by br...@apache.org on 2013/06/03 02:02:53 UTC

git commit: FLUME-2063: Add Configurable charset to RegexHbaseEventSerializer

Updated Branches:
  refs/heads/trunk eefefa941 -> d63c378b5


FLUME-2063: Add Configurable charset to RegexHbaseEventSerializer

(Roman Shaposhnik via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/d63c378b
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/d63c378b
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/d63c378b

Branch: refs/heads/trunk
Commit: d63c378b551bba338677d50a36d254dd8c62fe4a
Parents: eefefa9
Author: Brock Noland <br...@apache.org>
Authored: Sun Jun 2 17:02:19 2013 -0700
Committer: Brock Noland <br...@apache.org>
Committed: Sun Jun 2 17:02:19 2013 -0700

----------------------------------------------------------------------
 .../sink/hbase/RegexHbaseEventSerializer.java      |   18 ++++++++++----
 .../sink/hbase/TestRegexHbaseEventSerializer.java  |   18 ++++++++++-----
 2 files changed, 25 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/d63c378b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java
index 0df559d..27974d9 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java
@@ -18,6 +18,7 @@
  */
 package org.apache.flume.sink.hbase;
 
+import java.nio.charset.Charset;
 import java.util.Calendar;
 import java.util.List;
 import java.util.Map;
@@ -67,6 +68,10 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer {
   /** Whether to deposit event headers into corresponding column qualifiers */
   public static final String DEPOSIT_HEADERS_CONFIG = "depositHeaders";
   public static final boolean DEPOSIT_HEADERS_DEFAULT = false;
+
+  /** What charset to use when serializing into HBase's byte arrays */
+  public static final String CHARSET_CONFIG = "charset";
+  public static final String CHARSET_DEFAULT = "UTF-8";
   
   /* This is a nonce used in HBase row-keys, such that the same row-key
    * never gets written more than once from within this JVM. */
@@ -80,6 +85,7 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer {
   private boolean regexIgnoreCase;
   private boolean depositHeaders;
   private Pattern inputPattern;
+  private Charset charset;
   
   @Override
   public void configure(Context context) {
@@ -90,11 +96,13 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer {
         DEPOSIT_HEADERS_DEFAULT);
     inputPattern = Pattern.compile(regex, Pattern.DOTALL
         + (regexIgnoreCase ? Pattern.CASE_INSENSITIVE : 0));
+    charset = Charset.forName(context.getString(CHARSET_CONFIG,
+        CHARSET_DEFAULT));
     
     String colNameStr = context.getString(COL_NAME_CONFIG, COLUMN_NAME_DEFAULT);
     String[] columnNames = colNameStr.split(",");
     for (String s: columnNames) { 
-      colNames.add(s.getBytes(Charsets.UTF_8));
+      colNames.add(s.getBytes(charset));
     }
   }
 
@@ -131,7 +139,7 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer {
      *  data loss. */
     String rowKey = String.format("%s-%s-%s", cal.getTimeInMillis(),
         randomKey, nonce.getAndIncrement());
-    return rowKey.getBytes(Charsets.UTF_8);
+    return rowKey.getBytes(charset);
   }
   
   protected byte[] getRowKey() {
@@ -142,7 +150,7 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer {
   public List<Row> getActions() throws FlumeException {
     List<Row> actions = Lists.newArrayList();
     byte[] rowKey;
-    Matcher m = inputPattern.matcher(new String(payload));
+    Matcher m = inputPattern.matcher(new String(payload, charset));
     if (!m.matches()) {
       return Lists.newArrayList();
     }
@@ -156,11 +164,11 @@ public class RegexHbaseEventSerializer implements HbaseEventSerializer {
       Put put = new Put(rowKey);
       
       for (int i = 0; i < colNames.size(); i++) {
-        put.add(cf, colNames.get(i), m.group(i + 1).getBytes(Charsets.UTF_8));
+        put.add(cf, colNames.get(i), m.group(i + 1).getBytes(charset));
       }
       if (depositHeaders) {
         for (Map.Entry<String, String> entry : headers.entrySet()) {
-          put.add(cf, entry.getKey().getBytes(Charsets.UTF_8), entry.getValue().getBytes(Charsets.UTF_8));
+          put.add(cf, entry.getKey().getBytes(charset), entry.getValue().getBytes(charset));
         }
       }
       actions.add(put);

http://git-wip-us.apache.org/repos/asf/flume/blob/d63c378b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java
index 6cec36f..191dc54 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java
@@ -18,11 +18,13 @@
  */
 package org.apache.flume.sink.hbase;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.nio.charset.Charset;
 import java.util.Calendar;
 import java.util.List;
 import java.util.Map;
@@ -162,16 +164,19 @@ public class TestRegexHbaseEventSerializer {
    @Test
   /** Test depositing of the header information. */
   public void testDepositHeaders() throws Exception {
+    Charset charset = Charset.forName("KOI8-R");
     RegexHbaseEventSerializer s = new RegexHbaseEventSerializer();
     Context context = new Context();
     context.put(RegexHbaseEventSerializer.DEPOSIT_HEADERS_CONFIG,
         "true");
+    context.put(RegexHbaseEventSerializer.CHARSET_CONFIG,
+               charset.toString());
     s.configure(context);
 
     String body = "body";
     Map<String, String> headers = Maps.newHashMap();
     headers.put("header1", "value1");
-    headers.put("header2", "value2");
+    headers.put("заголовок2", "значение2");
 
     Event e = EventBuilder.withBody(Bytes.toBytes(body), headers);
     s.initialize(e, "CF".getBytes());
@@ -184,14 +189,15 @@ public class TestRegexHbaseEventSerializer {
     List<KeyValue> kvPairs = put.getFamilyMap().get(s.cf);
     assertTrue(kvPairs.size() == 3);
 
-    Map<String, String> resultMap = Maps.newHashMap();
+    Map<String, byte[]> resultMap = Maps.newHashMap();
     for (KeyValue kv : kvPairs) {
-      resultMap.put(new String(kv.getQualifier()), new String(kv.getValue()));
+      resultMap.put(new String(kv.getQualifier(), charset), kv.getValue());
     }
 
-    assertEquals(body, resultMap.get("payload"));
-    assertEquals("value1", resultMap.get("header1"));
-    assertEquals("value2", resultMap.get("header2"));
+    assertEquals(body, new String(resultMap.get(RegexHbaseEventSerializer.COLUMN_NAME_DEFAULT), charset));
+    assertEquals("value1", new String(resultMap.get("header1"), charset));
+    assertArrayEquals("значение2".getBytes(charset), resultMap.get("заголовок2"));
+    assertEquals("значение2".length(), resultMap.get("заголовок2").length);
 
     List<Increment> increments = s.getIncrements();
     assertEquals(0, increments.size());