You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/05/23 01:24:09 UTC

[11/13] apex-malhar git commit: Interceptor and hdfs test source

Interceptor and hdfs test source


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/2cfe153c
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/2cfe153c
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/2cfe153c

Branch: refs/heads/master
Commit: 2cfe153c98dd05b8185beaf58d6db8c4b7ec9408
Parents: 4432651
Author: Chandni Singh <ch...@datatorrent.com>
Authored: Sun Feb 19 21:34:49 2017 +0530
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Mon May 22 16:47:34 2017 -0700

----------------------------------------------------------------------
 .../ColumnFilteringFormattingInterceptor.java   | 228 +++++++++++++++++++
 .../flume/source/HdfsTestSource.java            | 222 ++++++++++++++++++
 ...olumnFilteringFormattingInterceptorTest.java | 134 +++++++++++
 3 files changed, 584 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2cfe153c/flume/src/main/java/com/datatorrent/flume/interceptor/ColumnFilteringFormattingInterceptor.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/com/datatorrent/flume/interceptor/ColumnFilteringFormattingInterceptor.java b/flume/src/main/java/com/datatorrent/flume/interceptor/ColumnFilteringFormattingInterceptor.java
new file mode 100644
index 0000000..ce92f6d
--- /dev/null
+++ b/flume/src/main/java/com/datatorrent/flume/interceptor/ColumnFilteringFormattingInterceptor.java
@@ -0,0 +1,228 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.interceptor;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.interceptor.Interceptor;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+
+import static com.datatorrent.flume.interceptor.ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER;
+import static com.datatorrent.flume.interceptor.ColumnFilteringFormattingInterceptor.Constants.SRC_SEPARATOR;
+import static com.datatorrent.flume.interceptor.ColumnFilteringFormattingInterceptor.Constants.SRC_SEPARATOR_DFLT;
+
+/**
+ * <p>ColumnFilteringFormattingInterceptor class.</p>
+ *
+ * @author Chandni Singh <ch...@datatorrent.com>
+ * @since 0.9.4
+ */
+public class ColumnFilteringFormattingInterceptor implements Interceptor
+{
+  private final byte srcSeparator;
+  private final byte[][] dstSeparators;
+  private final byte[] prefix;
+  private final int maxIndex;
+  private final int maxColumn;
+  private final int[] columns;
+  private final int[] positions;
+
+  private ColumnFilteringFormattingInterceptor(int[] columns, byte srcSeparator, byte[][] dstSeparators, byte[] prefix)
+  {
+    this.columns = columns;
+
+    int tempMaxColumn = Integer.MIN_VALUE;
+    for (int column : columns) {
+      if (column > tempMaxColumn) {
+        tempMaxColumn = column;
+      }
+    }
+    maxIndex = tempMaxColumn;
+    maxColumn = tempMaxColumn + 1;
+    positions = new int[maxColumn + 1];
+    this.srcSeparator = srcSeparator;
+    this.dstSeparators = dstSeparators;
+    this.prefix = prefix;
+  }
+
+  @Override
+  public void initialize()
+  {
+    /* no-op */
+  }
+
+  @Override
+  public Event intercept(Event event)
+  {
+    byte[] body = event.getBody();
+    if (body == null) {
+      return event;
+    }
+
+    final int length = body.length;
+
+    /* store positions of character after the separators */
+    int i = 0;
+    int index = 0;
+    while (i < length) {
+      if (body[i++] == srcSeparator) {
+        positions[++index] = i;
+        if (index >= maxIndex) {
+          break;
+        }
+      }
+    }
+
+    int nextVirginIndex;
+    boolean separatorAtEnd = true;
+    if (i == length && index < maxColumn) {
+      nextVirginIndex = index + 2;
+      positions[nextVirginIndex - 1] = length;
+      separatorAtEnd = length > 0 ? body[length - 1] == srcSeparator : false;
+    } else {
+      nextVirginIndex = index + 1;
+    }
+
+    int newArrayLen = prefix.length;
+    for (i = columns.length; i-- > 0; ) {
+      int column = columns[i];
+      int len = positions[column + 1] - positions[column];
+      if (len > 0) {
+        if (positions[column + 1] == length && !separatorAtEnd) {
+          newArrayLen += len;
+        } else {
+          newArrayLen += len - 1;
+        }
+      }
+      newArrayLen += dstSeparators[i].length;
+    }
+
+    byte[] newBody = new byte[newArrayLen];
+    int newOffset = 0;
+    if (prefix.length > 0) {
+      System.arraycopy(prefix, 0, newBody, 0, prefix.length);
+      newOffset += prefix.length;
+    }
+    int dstSeparatorsIdx = 0;
+    for (int column : columns) {
+      int len = positions[column + 1] - positions[column];
+      byte[] separator = dstSeparators[dstSeparatorsIdx++];
+      if (len > 0) {
+        System.arraycopy(body, positions[column], newBody, newOffset, len);
+        newOffset += len;
+        if (newBody[newOffset - 1] == srcSeparator) {
+          newOffset--;
+        }
+      }
+      System.arraycopy(separator, 0, newBody, newOffset, separator.length);
+      newOffset += separator.length;
+    }
+    event.setBody(newBody);
+    Arrays.fill(positions, 1, nextVirginIndex, 0);
+    return event;
+  }
+
+  @Override
+  public List<Event> intercept(List<Event> events)
+  {
+    for (Event event : events) {
+      intercept(event);
+    }
+    return events;
+  }
+
+  @Override
+  public void close()
+  {
+  }
+
+  public static class Builder implements Interceptor.Builder
+  {
+    private int[] columns;
+    private byte srcSeparator;
+    private byte[][] dstSeparators;
+    private byte[] prefix;
+
+    @Override
+    public Interceptor build()
+    {
+      return new ColumnFilteringFormattingInterceptor(columns, srcSeparator, dstSeparators, prefix);
+    }
+
+    @Override
+    public void configure(Context context)
+    {
+      String formatter = context.getString(COLUMNS_FORMATTER);
+      if (Strings.isNullOrEmpty(formatter)) {
+        throw new IllegalArgumentException("This interceptor requires columns format to be specified!");
+      }
+      List<String> lSeparators = Lists.newArrayList();
+      List<Integer> lColumns = Lists.newArrayList();
+      Pattern colPat = Pattern.compile("\\{\\d+?\\}");
+      Matcher matcher = colPat.matcher(formatter);
+      int separatorStart = 0;
+      String lPrefix = "";
+      while (matcher.find()) {
+        String col = matcher.group();
+        lColumns.add(Integer.parseInt(col.substring(1, col.length() - 1)));
+        if (separatorStart == 0 && matcher.start() > 0) {
+          lPrefix = formatter.substring(0, matcher.start());
+        } else if (separatorStart > 0) {
+          lSeparators.add(formatter.substring(separatorStart, matcher.start()));
+        }
+
+        separatorStart = matcher.end();
+      }
+      if (separatorStart < formatter.length()) {
+        lSeparators.add(formatter.substring(separatorStart, formatter.length()));
+      }
+      columns = Ints.toArray(lColumns);
+      byte[] emptyStringBytes = "".getBytes();
+
+      dstSeparators = new byte[columns.length][];
+
+      for (int i = 0; i < columns.length; i++) {
+        if (i < lSeparators.size()) {
+          dstSeparators[i] = lSeparators.get(i).getBytes();
+        } else {
+          dstSeparators[i] = emptyStringBytes;
+        }
+      }
+      srcSeparator = context.getInteger(SRC_SEPARATOR, (int)SRC_SEPARATOR_DFLT).byteValue();
+      this.prefix = lPrefix.getBytes();
+    }
+  }
+
+  public static class Constants extends ColumnFilteringInterceptor.Constants
+  {
+    public static final String COLUMNS_FORMATTER = "columnsFormatter";
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(ColumnFilteringFormattingInterceptor.class);
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2cfe153c/flume/src/main/java/com/datatorrent/flume/source/HdfsTestSource.java
----------------------------------------------------------------------
diff --git a/flume/src/main/java/com/datatorrent/flume/source/HdfsTestSource.java b/flume/src/main/java/com/datatorrent/flume/source/HdfsTestSource.java
new file mode 100644
index 0000000..18aac37
--- /dev/null
+++ b/flume/src/main/java/com/datatorrent/flume/source/HdfsTestSource.java
@@ -0,0 +1,222 @@
+/**
+ * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.source;
+
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import javax.annotation.Nonnull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.source.AbstractSource;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+
+/**
+ * <p>TestSource class.</p>
+ *
+ * @since 0.9.4
+ */
+public class HdfsTestSource extends AbstractSource implements EventDrivenSource, Configurable
+{
+  public static final String SOURCE_DIR = "sourceDir";
+  public static final String RATE = "rate";
+  public static final String INIT_DATE = "initDate";
+
+  static byte FIELD_SEPARATOR = 2;
+  public Timer emitTimer;
+  @Nonnull
+  String directory;
+  Path directoryPath;
+  int rate;
+  String initDate;
+  long initTime;
+  List<String> dataFiles;
+  long oneDayBack;
+
+  private transient BufferedReader br = null;
+  protected transient FileSystem fs;
+  private transient Configuration configuration;
+
+  private transient int currentFile = 0;
+  private transient boolean finished;
+  private List<Event> events;
+
+  public HdfsTestSource()
+  {
+    super();
+    this.rate = 2500;
+    dataFiles = Lists.newArrayList();
+    Calendar calendar = Calendar.getInstance();
+    calendar.add(Calendar.DATE, -1);
+    oneDayBack = calendar.getTimeInMillis();
+    configuration = new Configuration();
+    events = Lists.newArrayList();
+  }
+
+  @Override
+  public void configure(Context context)
+  {
+    directory = context.getString(SOURCE_DIR);
+    rate = context.getInteger(RATE, rate);
+    initDate = context.getString(INIT_DATE);
+
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(directory));
+    directoryPath = new Path(directory);
+
+    String[] parts = initDate.split("-");
+    Preconditions.checkArgument(parts.length == 3);
+    Calendar calendar = Calendar.getInstance();
+    calendar.set(Integer.parseInt(parts[0]), Integer.parseInt(parts[1]) - 1, Integer.parseInt(parts[2]), 0, 0, 0);
+    initTime = calendar.getTimeInMillis();
+
+    try {
+      List<String> files = findFiles();
+      for (String file : files) {
+        dataFiles.add(file);
+      }
+      if (logger.isDebugEnabled()) {
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
+        logger.debug("settings {} {} {} {} {}", directory, rate, dateFormat.format(oneDayBack),
+            dateFormat.format(new Date(initTime)), currentFile);
+        for (String file : dataFiles) {
+          logger.debug("settings add file {}", file);
+        }
+      }
+
+      fs = FileSystem.newInstance(new Path(directory).toUri(), configuration);
+      Path filePath = new Path(dataFiles.get(currentFile));
+      br = new BufferedReader(new InputStreamReader(new GzipCompressorInputStream(fs.open(filePath))));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    finished = true;
+
+  }
+
+  private List<String> findFiles() throws IOException
+  {
+    List<String> files = Lists.newArrayList();
+    Path directoryPath = new Path(directory);
+    FileSystem lfs = FileSystem.newInstance(directoryPath.toUri(), configuration);
+    try {
+      logger.debug("checking for new files in {}", directoryPath);
+      RemoteIterator<LocatedFileStatus> statuses = lfs.listFiles(directoryPath, true);
+      for (; statuses.hasNext(); ) {
+        FileStatus status = statuses.next();
+        Path path = status.getPath();
+        String filePathStr = path.toString();
+        if (!filePathStr.endsWith(".gz")) {
+          continue;
+        }
+        logger.debug("new file {}", filePathStr);
+        files.add(path.toString());
+      }
+    } catch (FileNotFoundException e) {
+      logger.warn("Failed to list directory {}", directoryPath, e);
+      throw new RuntimeException(e);
+    } finally {
+      lfs.close();
+    }
+    return files;
+  }
+
+  @Override
+  public void start()
+  {
+    super.start();
+    emitTimer = new Timer();
+
+    final ChannelProcessor channelProcessor = getChannelProcessor();
+    emitTimer.scheduleAtFixedRate(new TimerTask()
+    {
+      @Override
+      public void run()
+      {
+        int lineCount = 0;
+        events.clear();
+        try {
+          while (lineCount < rate && !finished) {
+            String line = br.readLine();
+
+            if (line == null) {
+              logger.debug("completed file {}", currentFile);
+              br.close();
+              currentFile++;
+              if (currentFile == dataFiles.size()) {
+                logger.info("finished all files");
+                finished = true;
+                break;
+              }
+              Path filePath = new Path(dataFiles.get(currentFile));
+              br = new BufferedReader(new InputStreamReader(new GzipCompressorInputStream(fs.open(filePath))));
+              logger.info("opening file {}. {}", currentFile, filePath);
+              continue;
+            }
+            lineCount++;
+            Event flumeEvent = EventBuilder.withBody(line.getBytes());
+            events.add(flumeEvent);
+          }
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+        if (events.size() > 0) {
+          channelProcessor.processEventBatch(events);
+        }
+        if (finished) {
+          emitTimer.cancel();
+        }
+      }
+
+    }, 0, 1000);
+  }
+
+  @Override
+  public void stop()
+  {
+    emitTimer.cancel();
+    super.stop();
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(HdfsTestSource.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2cfe153c/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringFormattingInterceptorTest.java
----------------------------------------------------------------------
diff --git a/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringFormattingInterceptorTest.java b/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringFormattingInterceptorTest.java
new file mode 100644
index 0000000..aca99c3
--- /dev/null
+++ b/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringFormattingInterceptorTest.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.flume.interceptor;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.flume.Context;
+import org.apache.flume.interceptor.Interceptor;
+
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * Tests for {@link ColumnFilteringFormattingInterceptor}
+ */
+public class ColumnFilteringFormattingInterceptorTest
+{
+  private static InterceptorTestHelper helper;
+
+  @BeforeClass
+  public static void startUp()
+  {
+    HashMap<String, String> contextMap = new HashMap<String, String>();
+    contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2));
+    contextMap.put(ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER, "{1}\001{2}\001{3}\001");
+
+    helper = new InterceptorTestHelper(new ColumnFilteringFormattingInterceptor.Builder(), contextMap);
+  }
+
+  @Test
+  public void testInterceptEvent()
+  {
+    helper.testIntercept_Event();
+  }
+
+  @Test
+  public void testFiles() throws IOException, URISyntaxException
+  {
+    helper.testFiles();
+  }
+
+  @Test
+  public void testInterceptEventWithPrefix()
+  {
+    HashMap<String, String> contextMap = new HashMap<String, String>();
+    contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2));
+    contextMap.put(ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER, "\001{1}\001{2}\001{3}\001");
+
+    ColumnFilteringFormattingInterceptor.Builder builder = new ColumnFilteringFormattingInterceptor.Builder();
+    builder.configure(new Context(contextMap));
+    Interceptor interceptor = builder.build();
+
+    assertArrayEquals("Six Fields",
+        "\001\001Second\001\001".getBytes(),
+        interceptor.intercept(
+        new InterceptorTestHelper.MyEvent("First\002\002Second\002\002\002".getBytes())).getBody());
+  }
+
+  @Test
+  public void testInterceptEventWithLongSeparator()
+  {
+    HashMap<String, String> contextMap = new HashMap<String, String>();
+    contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2));
+    contextMap.put(ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER, "a{1}bc{2}def{3}ghi");
+
+    ColumnFilteringFormattingInterceptor.Builder builder = new ColumnFilteringFormattingInterceptor.Builder();
+    builder.configure(new Context(contextMap));
+    Interceptor interceptor = builder.build();
+    byte[] body = interceptor.intercept(
+        new InterceptorTestHelper.MyEvent("First\002\002Second\002\002\002".getBytes())).getBody();
+
+    assertArrayEquals("Six Fields, " + new String(body), "abcSeconddefghi".getBytes(), body);
+  }
+
+  @Test
+  public void testInterceptEventWithTerminatingSeparator()
+  {
+    HashMap<String, String> contextMap = new HashMap<String, String>();
+    contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2));
+    contextMap.put(ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER, "a{1}bc{2}def{3}");
+
+    ColumnFilteringFormattingInterceptor.Builder builder = new ColumnFilteringFormattingInterceptor.Builder();
+    builder.configure(new Context(contextMap));
+    Interceptor interceptor = builder.build();
+    byte[] body = interceptor.intercept(
+        new InterceptorTestHelper.MyEvent("First\002\002Second\002\002\002".getBytes())).getBody();
+
+    assertArrayEquals("Six Fields, " + new String(body), "abcSeconddef".getBytes(), body);
+  }
+
+  @Test
+  public void testInterceptEventWithColumnZero()
+  {
+    HashMap<String, String> contextMap = new HashMap<String, String>();
+    contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2));
+    contextMap.put(ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER, "{0}\001");
+
+    ColumnFilteringFormattingInterceptor.Builder builder = new ColumnFilteringFormattingInterceptor.Builder();
+    builder.configure(new Context(contextMap));
+    Interceptor interceptor = builder.build();
+
+    assertArrayEquals("Empty Bytes",
+        "\001".getBytes(),
+        interceptor.intercept(new InterceptorTestHelper.MyEvent("".getBytes())).getBody());
+
+    assertArrayEquals("One Field",
+        "First\001".getBytes(),
+        interceptor.intercept(new InterceptorTestHelper.MyEvent("First".getBytes())).getBody());
+
+    assertArrayEquals("Two Fields",
+        "\001".getBytes(),
+        interceptor.intercept(new InterceptorTestHelper.MyEvent("\002First".getBytes())).getBody());
+  }
+}