You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/05/23 01:24:09 UTC
[11/13] apex-malhar git commit: Interceptor and hdfs test source
Interceptor and hdfs test source
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/2cfe153c
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/2cfe153c
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/2cfe153c
Branch: refs/heads/master
Commit: 2cfe153c98dd05b8185beaf58d6db8c4b7ec9408
Parents: 4432651
Author: Chandni Singh <ch...@datatorrent.com>
Authored: Sun Feb 19 21:34:49 2017 +0530
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Mon May 22 16:47:34 2017 -0700
----------------------------------------------------------------------
.../ColumnFilteringFormattingInterceptor.java | 228 +++++++++++++++++++
.../flume/source/HdfsTestSource.java | 222 ++++++++++++++++++
...olumnFilteringFormattingInterceptorTest.java | 134 +++++++++++
3 files changed, 584 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2cfe153c/flume/src/main/java/com/datatorrent/flume/interceptor/ColumnFilteringFormattingInterceptor.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/com/datatorrent/flume/interceptor/ColumnFilteringFormattingInterceptor.java b/flume/src/main/java/com/datatorrent/flume/interceptor/ColumnFilteringFormattingInterceptor.java
new file mode 100644
index 0000000..ce92f6d
--- /dev/null
+++ b/flume/src/main/java/com/datatorrent/flume/interceptor/ColumnFilteringFormattingInterceptor.java
@@ -0,0 +1,228 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.interceptor;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.interceptor.Interceptor;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+
+import static com.datatorrent.flume.interceptor.ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER;
+import static com.datatorrent.flume.interceptor.ColumnFilteringFormattingInterceptor.Constants.SRC_SEPARATOR;
+import static com.datatorrent.flume.interceptor.ColumnFilteringFormattingInterceptor.Constants.SRC_SEPARATOR_DFLT;
+
+/**
+ * <p>ColumnFilteringFormattingInterceptor class.</p>
+ *
+ * @author Chandni Singh <ch...@datatorrent.com>
+ * @since 0.9.4
+ */
+public class ColumnFilteringFormattingInterceptor implements Interceptor
+{
+ private final byte srcSeparator;
+ private final byte[][] dstSeparators;
+ private final byte[] prefix;
+ private final int maxIndex;
+ private final int maxColumn;
+ private final int[] columns;
+ private final int[] positions;
+
+ private ColumnFilteringFormattingInterceptor(int[] columns, byte srcSeparator, byte[][] dstSeparators, byte[] prefix)
+ {
+ this.columns = columns;
+
+ int tempMaxColumn = Integer.MIN_VALUE;
+ for (int column : columns) {
+ if (column > tempMaxColumn) {
+ tempMaxColumn = column;
+ }
+ }
+ maxIndex = tempMaxColumn;
+ maxColumn = tempMaxColumn + 1;
+ positions = new int[maxColumn + 1];
+ this.srcSeparator = srcSeparator;
+ this.dstSeparators = dstSeparators;
+ this.prefix = prefix;
+ }
+
+ @Override
+ public void initialize()
+ {
+ /* no-op */
+ }
+
+ @Override
+ public Event intercept(Event event)
+ {
+ byte[] body = event.getBody();
+ if (body == null) {
+ return event;
+ }
+
+ final int length = body.length;
+
+ /* store positions of character after the separators */
+ int i = 0;
+ int index = 0;
+ while (i < length) {
+ if (body[i++] == srcSeparator) {
+ positions[++index] = i;
+ if (index >= maxIndex) {
+ break;
+ }
+ }
+ }
+
+ int nextVirginIndex;
+ boolean separatorAtEnd = true;
+ if (i == length && index < maxColumn) {
+ nextVirginIndex = index + 2;
+ positions[nextVirginIndex - 1] = length;
+ separatorAtEnd = length > 0 ? body[length - 1] == srcSeparator : false;
+ } else {
+ nextVirginIndex = index + 1;
+ }
+
+ int newArrayLen = prefix.length;
+ for (i = columns.length; i-- > 0; ) {
+ int column = columns[i];
+ int len = positions[column + 1] - positions[column];
+ if (len > 0) {
+ if (positions[column + 1] == length && !separatorAtEnd) {
+ newArrayLen += len;
+ } else {
+ newArrayLen += len - 1;
+ }
+ }
+ newArrayLen += dstSeparators[i].length;
+ }
+
+ byte[] newBody = new byte[newArrayLen];
+ int newOffset = 0;
+ if (prefix.length > 0) {
+ System.arraycopy(prefix, 0, newBody, 0, prefix.length);
+ newOffset += prefix.length;
+ }
+ int dstSeparatorsIdx = 0;
+ for (int column : columns) {
+ int len = positions[column + 1] - positions[column];
+ byte[] separator = dstSeparators[dstSeparatorsIdx++];
+ if (len > 0) {
+ System.arraycopy(body, positions[column], newBody, newOffset, len);
+ newOffset += len;
+ if (newBody[newOffset - 1] == srcSeparator) {
+ newOffset--;
+ }
+ }
+ System.arraycopy(separator, 0, newBody, newOffset, separator.length);
+ newOffset += separator.length;
+ }
+ event.setBody(newBody);
+ Arrays.fill(positions, 1, nextVirginIndex, 0);
+ return event;
+ }
+
+ @Override
+ public List<Event> intercept(List<Event> events)
+ {
+ for (Event event : events) {
+ intercept(event);
+ }
+ return events;
+ }
+
+ @Override
+ public void close()
+ {
+ }
+
+ public static class Builder implements Interceptor.Builder
+ {
+ private int[] columns;
+ private byte srcSeparator;
+ private byte[][] dstSeparators;
+ private byte[] prefix;
+
+ @Override
+ public Interceptor build()
+ {
+ return new ColumnFilteringFormattingInterceptor(columns, srcSeparator, dstSeparators, prefix);
+ }
+
+ @Override
+ public void configure(Context context)
+ {
+ String formatter = context.getString(COLUMNS_FORMATTER);
+ if (Strings.isNullOrEmpty(formatter)) {
+ throw new IllegalArgumentException("This interceptor requires columns format to be specified!");
+ }
+ List<String> lSeparators = Lists.newArrayList();
+ List<Integer> lColumns = Lists.newArrayList();
+ Pattern colPat = Pattern.compile("\\{\\d+?\\}");
+ Matcher matcher = colPat.matcher(formatter);
+ int separatorStart = 0;
+ String lPrefix = "";
+ while (matcher.find()) {
+ String col = matcher.group();
+ lColumns.add(Integer.parseInt(col.substring(1, col.length() - 1)));
+ if (separatorStart == 0 && matcher.start() > 0) {
+ lPrefix = formatter.substring(0, matcher.start());
+ } else if (separatorStart > 0) {
+ lSeparators.add(formatter.substring(separatorStart, matcher.start()));
+ }
+
+ separatorStart = matcher.end();
+ }
+ if (separatorStart < formatter.length()) {
+ lSeparators.add(formatter.substring(separatorStart, formatter.length()));
+ }
+ columns = Ints.toArray(lColumns);
+ byte[] emptyStringBytes = "".getBytes();
+
+ dstSeparators = new byte[columns.length][];
+
+ for (int i = 0; i < columns.length; i++) {
+ if (i < lSeparators.size()) {
+ dstSeparators[i] = lSeparators.get(i).getBytes();
+ } else {
+ dstSeparators[i] = emptyStringBytes;
+ }
+ }
+ srcSeparator = context.getInteger(SRC_SEPARATOR, (int)SRC_SEPARATOR_DFLT).byteValue();
+ this.prefix = lPrefix.getBytes();
+ }
+ }
+
+ public static class Constants extends ColumnFilteringInterceptor.Constants
+ {
+ public static final String COLUMNS_FORMATTER = "columnsFormatter";
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(ColumnFilteringFormattingInterceptor.class);
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2cfe153c/flume/src/main/java/com/datatorrent/flume/source/HdfsTestSource.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/com/datatorrent/flume/source/HdfsTestSource.java b/flume/src/main/java/com/datatorrent/flume/source/HdfsTestSource.java
new file mode 100644
index 0000000..18aac37
--- /dev/null
+++ b/flume/src/main/java/com/datatorrent/flume/source/HdfsTestSource.java
@@ -0,0 +1,222 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.source;
+
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import javax.annotation.Nonnull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.source.AbstractSource;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+
+/**
+ * <p>TestSource class.</p>
+ *
+ * @since 0.9.4
+ */
+public class HdfsTestSource extends AbstractSource implements EventDrivenSource, Configurable
+{
+ public static final String SOURCE_DIR = "sourceDir";
+ public static final String RATE = "rate";
+ public static final String INIT_DATE = "initDate";
+
+ static byte FIELD_SEPARATOR = 2;
+ public Timer emitTimer;
+ @Nonnull
+ String directory;
+ Path directoryPath;
+ int rate;
+ String initDate;
+ long initTime;
+ List<String> dataFiles;
+ long oneDayBack;
+
+ private transient BufferedReader br = null;
+ protected transient FileSystem fs;
+ private transient Configuration configuration;
+
+ private transient int currentFile = 0;
+ private transient boolean finished;
+ private List<Event> events;
+
+ public HdfsTestSource()
+ {
+ super();
+ this.rate = 2500;
+ dataFiles = Lists.newArrayList();
+ Calendar calendar = Calendar.getInstance();
+ calendar.add(Calendar.DATE, -1);
+ oneDayBack = calendar.getTimeInMillis();
+ configuration = new Configuration();
+ events = Lists.newArrayList();
+ }
+
+ @Override
+ public void configure(Context context)
+ {
+ directory = context.getString(SOURCE_DIR);
+ rate = context.getInteger(RATE, rate);
+ initDate = context.getString(INIT_DATE);
+
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(directory));
+ directoryPath = new Path(directory);
+
+ String[] parts = initDate.split("-");
+ Preconditions.checkArgument(parts.length == 3);
+ Calendar calendar = Calendar.getInstance();
+ calendar.set(Integer.parseInt(parts[0]), Integer.parseInt(parts[1]) - 1, Integer.parseInt(parts[2]), 0, 0, 0);
+ initTime = calendar.getTimeInMillis();
+
+ try {
+ List<String> files = findFiles();
+ for (String file : files) {
+ dataFiles.add(file);
+ }
+ if (logger.isDebugEnabled()) {
+ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
+ logger.debug("settings {} {} {} {} {}", directory, rate, dateFormat.format(oneDayBack),
+ dateFormat.format(new Date(initTime)), currentFile);
+ for (String file : dataFiles) {
+ logger.debug("settings add file {}", file);
+ }
+ }
+
+ fs = FileSystem.newInstance(new Path(directory).toUri(), configuration);
+ Path filePath = new Path(dataFiles.get(currentFile));
+ br = new BufferedReader(new InputStreamReader(new GzipCompressorInputStream(fs.open(filePath))));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ finished = true;
+
+ }
+
+ private List<String> findFiles() throws IOException
+ {
+ List<String> files = Lists.newArrayList();
+ Path directoryPath = new Path(directory);
+ FileSystem lfs = FileSystem.newInstance(directoryPath.toUri(), configuration);
+ try {
+ logger.debug("checking for new files in {}", directoryPath);
+ RemoteIterator<LocatedFileStatus> statuses = lfs.listFiles(directoryPath, true);
+ for (; statuses.hasNext(); ) {
+ FileStatus status = statuses.next();
+ Path path = status.getPath();
+ String filePathStr = path.toString();
+ if (!filePathStr.endsWith(".gz")) {
+ continue;
+ }
+ logger.debug("new file {}", filePathStr);
+ files.add(path.toString());
+ }
+ } catch (FileNotFoundException e) {
+ logger.warn("Failed to list directory {}", directoryPath, e);
+ throw new RuntimeException(e);
+ } finally {
+ lfs.close();
+ }
+ return files;
+ }
+
+ @Override
+ public void start()
+ {
+ super.start();
+ emitTimer = new Timer();
+
+ final ChannelProcessor channelProcessor = getChannelProcessor();
+ emitTimer.scheduleAtFixedRate(new TimerTask()
+ {
+ @Override
+ public void run()
+ {
+ int lineCount = 0;
+ events.clear();
+ try {
+ while (lineCount < rate && !finished) {
+ String line = br.readLine();
+
+ if (line == null) {
+ logger.debug("completed file {}", currentFile);
+ br.close();
+ currentFile++;
+ if (currentFile == dataFiles.size()) {
+ logger.info("finished all files");
+ finished = true;
+ break;
+ }
+ Path filePath = new Path(dataFiles.get(currentFile));
+ br = new BufferedReader(new InputStreamReader(new GzipCompressorInputStream(fs.open(filePath))));
+ logger.info("opening file {}. {}", currentFile, filePath);
+ continue;
+ }
+ lineCount++;
+ Event flumeEvent = EventBuilder.withBody(line.getBytes());
+ events.add(flumeEvent);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ if (events.size() > 0) {
+ channelProcessor.processEventBatch(events);
+ }
+ if (finished) {
+ emitTimer.cancel();
+ }
+ }
+
+ }, 0, 1000);
+ }
+
+ @Override
+ public void stop()
+ {
+ emitTimer.cancel();
+ super.stop();
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(HdfsTestSource.class);
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2cfe153c/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringFormattingInterceptorTest.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringFormattingInterceptorTest.java b/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringFormattingInterceptorTest.java
new file mode 100644
index 0000000..aca99c3
--- /dev/null
+++ b/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringFormattingInterceptorTest.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.interceptor;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.flume.Context;
+import org.apache.flume.interceptor.Interceptor;
+
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * Tests for {@link ColumnFilteringFormattingInterceptor}
+ */
+public class ColumnFilteringFormattingInterceptorTest
+{
+ private static InterceptorTestHelper helper;
+
+ @BeforeClass
+ public static void startUp()
+ {
+ HashMap<String, String> contextMap = new HashMap<String, String>();
+ contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2));
+ contextMap.put(ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER, "{1}\001{2}\001{3}\001");
+
+ helper = new InterceptorTestHelper(new ColumnFilteringFormattingInterceptor.Builder(), contextMap);
+ }
+
+ @Test
+ public void testInterceptEvent()
+ {
+ helper.testIntercept_Event();
+ }
+
+ @Test
+ public void testFiles() throws IOException, URISyntaxException
+ {
+ helper.testFiles();
+ }
+
+ @Test
+ public void testInterceptEventWithPrefix()
+ {
+ HashMap<String, String> contextMap = new HashMap<String, String>();
+ contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2));
+ contextMap.put(ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER, "\001{1}\001{2}\001{3}\001");
+
+ ColumnFilteringFormattingInterceptor.Builder builder = new ColumnFilteringFormattingInterceptor.Builder();
+ builder.configure(new Context(contextMap));
+ Interceptor interceptor = builder.build();
+
+ assertArrayEquals("Six Fields",
+ "\001\001Second\001\001".getBytes(),
+ interceptor.intercept(
+ new InterceptorTestHelper.MyEvent("First\002\002Second\002\002\002".getBytes())).getBody());
+ }
+
+ @Test
+ public void testInterceptEventWithLongSeparator()
+ {
+ HashMap<String, String> contextMap = new HashMap<String, String>();
+ contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2));
+ contextMap.put(ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER, "a{1}bc{2}def{3}ghi");
+
+ ColumnFilteringFormattingInterceptor.Builder builder = new ColumnFilteringFormattingInterceptor.Builder();
+ builder.configure(new Context(contextMap));
+ Interceptor interceptor = builder.build();
+ byte[] body = interceptor.intercept(
+ new InterceptorTestHelper.MyEvent("First\002\002Second\002\002\002".getBytes())).getBody();
+
+ assertArrayEquals("Six Fields, " + new String(body), "abcSeconddefghi".getBytes(), body);
+ }
+
+ @Test
+ public void testInterceptEventWithTerminatingSeparator()
+ {
+ HashMap<String, String> contextMap = new HashMap<String, String>();
+ contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2));
+ contextMap.put(ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER, "a{1}bc{2}def{3}");
+
+ ColumnFilteringFormattingInterceptor.Builder builder = new ColumnFilteringFormattingInterceptor.Builder();
+ builder.configure(new Context(contextMap));
+ Interceptor interceptor = builder.build();
+ byte[] body = interceptor.intercept(
+ new InterceptorTestHelper.MyEvent("First\002\002Second\002\002\002".getBytes())).getBody();
+
+ assertArrayEquals("Six Fields, " + new String(body), "abcSeconddef".getBytes(), body);
+ }
+
+ @Test
+ public void testInterceptEventWithColumnZero()
+ {
+ HashMap<String, String> contextMap = new HashMap<String, String>();
+ contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2));
+ contextMap.put(ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER, "{0}\001");
+
+ ColumnFilteringFormattingInterceptor.Builder builder = new ColumnFilteringFormattingInterceptor.Builder();
+ builder.configure(new Context(contextMap));
+ Interceptor interceptor = builder.build();
+
+ assertArrayEquals("Empty Bytes",
+ "\001".getBytes(),
+ interceptor.intercept(new InterceptorTestHelper.MyEvent("".getBytes())).getBody());
+
+ assertArrayEquals("One Field",
+ "First\001".getBytes(),
+ interceptor.intercept(new InterceptorTestHelper.MyEvent("First".getBytes())).getBody());
+
+ assertArrayEquals("Two Fields",
+ "\001".getBytes(),
+ interceptor.intercept(new InterceptorTestHelper.MyEvent("\002First".getBytes())).getBody());
+ }
+}