You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2015/04/07 23:47:06 UTC

flume git commit: FLUME-2613. Add support in FileChannelIntegrityTool to remove invalid events from the channel.

Repository: flume
Updated Branches:
  refs/heads/trunk cfefda167 -> 91ec57945


FLUME-2613. Add support in FileChannelIntegrityTool to remove invalid events from the channel.

(Ashish Paliwal via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/91ec5794
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/91ec5794
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/91ec5794

Branch: refs/heads/trunk
Commit: 91ec5794589bf3711cca2a251a511fa360e5ac30
Parents: cfefda1
Author: Hari Shreedharan <hs...@apache.org>
Authored: Tue Apr 7 14:46:10 2015 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Tue Apr 7 14:47:10 2015 -0700

----------------------------------------------------------------------
 .../apache/flume/channel/file/EventUtils.java   | 41 ++++++++
 .../flume/channel/file/TestEventUtils.java      | 44 +++++++++
 .../org/apache/flume/tools/EventValidator.java  | 49 ++++++++++
 .../flume/tools/FileChannelIntegrityTool.java   | 99 +++++++++++++++++++-
 .../tools/TestFileChannelIntegrityTool.java     | 75 +++++++++++++++
 5 files changed, 306 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/91ec5794/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventUtils.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventUtils.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventUtils.java
new file mode 100644
index 0000000..ff5242a
--- /dev/null
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventUtils.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.channel.file;
+
+import org.apache.flume.Event;
+
+/**
+ *
+ */
+public class EventUtils {
+
+  /**
+   * Returns the Event encapsulated by a Put wrapper
+   *
+   * @param transactionEventRecord TransactionEvent
+   * @return Event if Put instance is present, null otherwise
+   */
+  public static Event getEventFromTransactionEvent(TransactionEventRecord transactionEventRecord) {
+    if(transactionEventRecord instanceof Put) {
+      return ((Put)transactionEventRecord).getEvent();
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/91ec5794/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventUtils.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventUtils.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventUtils.java
new file mode 100644
index 0000000..c72e3f2
--- /dev/null
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventUtils.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.channel.file;
+
+import junit.framework.Assert;
+import org.apache.flume.Event;
+import org.junit.Test;
+
+public class TestEventUtils {
+
+  @Test
+  public void testPutEvent() {
+    FlumeEvent event = new FlumeEvent(null, new byte[5]);
+    Put put = new Put(1l, 1l, event);
+    Event returnEvent = EventUtils.getEventFromTransactionEvent(put);
+    Assert.assertNotNull(returnEvent);
+    Assert.assertEquals(5, returnEvent.getBody().length);
+  }
+
+  @Test
+  public void testInvalidEvent() {
+    Take take = new Take(1l, 1l);
+    Event returnEvent = EventUtils.getEventFromTransactionEvent(take);
+    Assert.assertNull(returnEvent);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/91ec5794/flume-tools/src/main/java/org/apache/flume/tools/EventValidator.java
----------------------------------------------------------------------
diff --git a/flume-tools/src/main/java/org/apache/flume/tools/EventValidator.java b/flume-tools/src/main/java/org/apache/flume/tools/EventValidator.java
new file mode 100644
index 0000000..10e677d
--- /dev/null
+++ b/flume-tools/src/main/java/org/apache/flume/tools/EventValidator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.tools;
+
+import org.apache.flume.Event;
+import org.apache.flume.conf.Configurable;
+
+/**
+ * Event Validator interface to be used for validating Events
+ * per custom logic
+ */
+public interface EventValidator {
+
+  /**
+   * Validate the Event in a application specific manner
+   *
+   * @param event Flume Event
+   * @return  true if Event is valid as per App Logic
+   */
+  boolean validateEvent(Event event);
+
+  EventValidator NOOP_VALIDATOR = new EventValidator() {
+    @Override
+    public boolean validateEvent(Event event) {
+      return true;
+    }
+  };
+
+  interface Builder extends Configurable {
+    EventValidator build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/91ec5794/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java
----------------------------------------------------------------------
diff --git a/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java b/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java
index 1030442..7abb7eb 100644
--- a/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java
+++ b/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java
@@ -22,15 +22,21 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
 import org.apache.flume.FlumeException;
 import org.apache.flume.channel.file.CorruptEventException;
+import org.apache.flume.channel.file.EventUtils;
 import org.apache.flume.channel.file.Log;
 import org.apache.flume.channel.file.LogFile;
 import org.apache.flume.channel.file.LogFileV3;
 import org.apache.flume.channel.file.LogRecord;
 import org.apache.flume.channel.file.Serialization;
+import org.apache.flume.channel.file.TransactionEventRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,6 +45,8 @@ import java.io.FilenameFilter;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
+import java.util.Set;
 
 public class FileChannelIntegrityTool implements FlumeTool {
   public static final Logger LOG = LoggerFactory.getLogger
@@ -46,6 +54,15 @@ public class FileChannelIntegrityTool implements FlumeTool {
 
   private final List<File> dataDirs = new ArrayList<File>();
 
+  private EventValidator eventValidator = EventValidator.NOOP_VALIDATOR;
+
+  private long totalPutEvents;
+  private long invalidEvents;
+  private long eventsWithException;
+  private long corruptEvents;
+  private long validEvents;
+  private long totalChannelEvents;
+
   @Override
   public void run(String[] args) throws IOException, ParseException {
     boolean shouldContinue = parseCommandLineOpts(args);
@@ -85,12 +102,39 @@ public class FileChannelIntegrityTool implements FlumeTool {
               // this will throw a CorruptEventException - so the real logic
               // is in the catch block below.
               LogRecord record = reader.next();
+              totalChannelEvents++;
               if (record != null) {
-                record.getEvent();
+                TransactionEventRecord recordEvent = record.getEvent();
+                Event event = EventUtils.getEventFromTransactionEvent(recordEvent);
+                if(event != null) {
+                  totalPutEvents++;
+                  try {
+                    if (!eventValidator.validateEvent(event)) {
+                      if (!fileBackedup) {
+                        Serialization.copyFile(dataFile, new File(dataFile.getParent(),
+                                dataFile.getName() + ".bak"));
+                        fileBackedup = true;
+                      }
+                      invalidEvents++;
+                      updater.markRecordAsNoop(eventPosition);
+                    } else {
+                      validEvents++;
+                    }
+                  } catch (Exception e) {
+                    // OOPS, didn't expected an exception
+                    // considering as failure case
+                    // marking as noop
+                    System.err.println("Encountered Exception while validating event, marking as invalid");
+                    updater.markRecordAsNoop(eventPosition);
+                    eventsWithException++;
+                  }
+                }
               } else {
                 fileDone = true;
               }
             } catch (CorruptEventException e) {
+              corruptEvents++;
+              totalChannelEvents++;
               LOG.warn("Corruption found in " + dataFile.toString() + " at "
                 + eventPosition);
               if (!fileBackedup) {
@@ -106,6 +150,7 @@ public class FileChannelIntegrityTool implements FlumeTool {
         }
       }
     }
+    printSummary();
   }
 
   private boolean parseCommandLineOpts(String[] args) throws ParseException {
@@ -113,7 +158,17 @@ public class FileChannelIntegrityTool implements FlumeTool {
     options
       .addOption("l", "dataDirs", true, "Comma-separated list of data " +
         "directories which the tool must verify. This option is mandatory")
-      .addOption("h", "help", false, "Display help");
+      .addOption("h", "help", false, "Display help")
+            .addOption("e", "eventValidator", true, "Fully Qualified Name of Event Validator Implementation");;
+
+
+    Option property  = OptionBuilder.withArgName("property=value")
+            .hasArgs(2)
+            .withValueSeparator()
+            .withDescription( "custom properties" )
+            .create( "D" );
+
+    options.addOption(property);
 
     CommandLineParser parser = new GnuParser();
     CommandLine commandLine = parser.parse(options, args);
@@ -137,6 +192,46 @@ public class FileChannelIntegrityTool implements FlumeTool {
         dataDirs.add(f);
       }
     }
+
+    if(commandLine.hasOption("eventValidator")) {
+      try {
+        Class<? extends EventValidator.Builder> eventValidatorClassName =
+                (Class<? extends EventValidator.Builder>)Class.forName(
+                  commandLine.getOptionValue("eventValidator"));
+        EventValidator.Builder eventValidatorBuilder = eventValidatorClassName.newInstance();
+
+        // Pass on the configuration parameter
+        Properties systemProperties = commandLine.getOptionProperties("D");
+        Context context = new Context();
+
+        Set<String> keys = systemProperties.stringPropertyNames();
+        for (String key : keys) {
+          context.put(key, systemProperties.getProperty(key));
+        }
+        eventValidatorBuilder.configure(context);
+        eventValidator = eventValidatorBuilder.build();
+      } catch (Exception e) {
+        System.err.println(String.format("Could find class %s in lib folder",
+                commandLine.getOptionValue("eventValidator")));
+        e.printStackTrace();
+        return false;
+      }
+    }
     return true;
   }
+
+  /**
+   * Prints the summary of run. Following information is printed
+   *
+   */
+  private void printSummary() {
+    System.out.println("---------- Summary --------------------");
+    System.out.println("Number of Events in the Channel = "+totalChannelEvents++);
+    System.out.println("Number of Put Events Processed = "+totalPutEvents);
+    System.out.println("Number of Valid Put Events = "+validEvents);
+    System.out.println("Number of Invalid Put Events = "+invalidEvents);
+    System.out.println("Number of Put Events that threw Exception during validation = "+eventsWithException);
+    System.out.println("Number of Corrupt Events = "+corruptEvents);
+    System.out.println("---------------------------------------");
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/91ec5794/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java
----------------------------------------------------------------------
diff --git a/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java b/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java
index f24ae56..ac4dac4 100644
--- a/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java
+++ b/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java
@@ -58,6 +58,7 @@ public class TestFileChannelIntegrityTool {
   private File checkpointDir;
   private File dataDir;
 
+  private static int invalidEvent = 0;
 
   @BeforeClass
   public static void setUpClass() throws Exception{
@@ -97,6 +98,45 @@ public class TestFileChannelIntegrityTool {
   }
 
   @Test
+  public void testFixInvalidRecords() throws Exception {
+    doTestFixInvalidEvents(false, DummyEventVerifier.Builder.class.getName());
+  }
+  @Test
+  public void testFixInvalidRecordsWithCheckpoint() throws Exception {
+    doTestFixInvalidEvents(true, DummyEventVerifier.Builder.class.getName());
+  }
+
+  public void doTestFixInvalidEvents(boolean withCheckpoint, String eventHandler) throws Exception {
+    FileChannelIntegrityTool tool = new FileChannelIntegrityTool();
+    tool.run(new String[] {"-l", dataDir.toString(), "-e", eventHandler, "-DvalidatorValue=0"});
+    FileChannel channel = new FileChannel();
+    channel.setName("channel");
+    String cp;
+    if(withCheckpoint) {
+      cp = origCheckpointDir.toString();
+    } else {
+      FileUtils.deleteDirectory(checkpointDir);
+      Assert.assertTrue(checkpointDir.mkdirs());
+      cp = checkpointDir.toString();
+    }
+    ctx.put(FileChannelConfiguration.CHECKPOINT_DIR,cp);
+    ctx.put(FileChannelConfiguration.DATA_DIRS, dataDir.toString());
+    channel.configure(ctx);
+    channel.start();
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    int i = 0;
+    while(channel.take() != null) {
+      i++;
+    }
+    tx.commit();
+    tx.close();
+    channel.stop();
+    Assert.assertTrue(invalidEvent != 0);
+    Assert.assertEquals(25 - invalidEvent, i);
+  }
+
+  @Test
   public void testFixCorruptRecords() throws Exception {
     doTestFixCorruptEvents(false);
   }
@@ -226,6 +266,12 @@ public class TestFileChannelIntegrityTool {
       Transaction tx = channel.getTransaction();
       tx.begin();
       for (int i = 0; i < 5; i++) {
+        if(i % 3 == 0) {
+          event.getBody()[0] = 0;
+          invalidEvent++;
+        } else {
+          event.getBody()[0] = 1;
+        }
         channel.put(event);
       }
       tx.commit();
@@ -244,4 +290,33 @@ public class TestFileChannelIntegrityTool {
         .invoke(true));
     channel.stop();
   }
+
+  public static class DummyEventVerifier implements EventValidator {
+
+    private int value = 0;
+
+    private DummyEventVerifier(int val) {
+      value = val;
+    }
+
+    @Override
+    public boolean validateEvent(Event event) {
+      return event.getBody()[0] != value;
+    }
+
+    public static class Builder implements EventValidator.Builder {
+
+      private int binaryValidator = 0;
+
+      @Override
+      public EventValidator build() {
+        return new DummyEventVerifier(binaryValidator);
+      }
+
+      @Override
+      public void configure(Context context) {
+        binaryValidator = context.getInteger("validatorValue");
+      }
+    }
+  }
 }