You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2015/04/07 23:47:06 UTC
flume git commit: FLUME-2613. Add support in FileChannelIntegrityTool
to remove invalid events from the channel.
Repository: flume
Updated Branches:
refs/heads/trunk cfefda167 -> 91ec57945
FLUME-2613. Add support in FileChannelIntegrityTool to remove invalid events from the channel.
(Ashish Paliwal via Hari)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/91ec5794
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/91ec5794
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/91ec5794
Branch: refs/heads/trunk
Commit: 91ec5794589bf3711cca2a251a511fa360e5ac30
Parents: cfefda1
Author: Hari Shreedharan <hs...@apache.org>
Authored: Tue Apr 7 14:46:10 2015 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Tue Apr 7 14:47:10 2015 -0700
----------------------------------------------------------------------
.../apache/flume/channel/file/EventUtils.java | 41 ++++++++
.../flume/channel/file/TestEventUtils.java | 44 +++++++++
.../org/apache/flume/tools/EventValidator.java | 49 ++++++++++
.../flume/tools/FileChannelIntegrityTool.java | 99 +++++++++++++++++++-
.../tools/TestFileChannelIntegrityTool.java | 75 +++++++++++++++
5 files changed, 306 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/91ec5794/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventUtils.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventUtils.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventUtils.java
new file mode 100644
index 0000000..ff5242a
--- /dev/null
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventUtils.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.channel.file;
+
+import org.apache.flume.Event;
+
+/**
+ *
+ */
+public class EventUtils {
+
+ /**
+ * Returns the Event encapsulated by a Put wrapper
+ *
+ * @param transactionEventRecord TransactionEvent
+ * @return Event if Put instance is present, null otherwise
+ */
+ public static Event getEventFromTransactionEvent(TransactionEventRecord transactionEventRecord) {
+ if(transactionEventRecord instanceof Put) {
+ return ((Put)transactionEventRecord).getEvent();
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/91ec5794/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventUtils.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventUtils.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventUtils.java
new file mode 100644
index 0000000..c72e3f2
--- /dev/null
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventUtils.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.channel.file;
+
+import junit.framework.Assert;
+import org.apache.flume.Event;
+import org.junit.Test;
+
+public class TestEventUtils {
+
+ @Test
+ public void testPutEvent() {
+ FlumeEvent event = new FlumeEvent(null, new byte[5]);
+ Put put = new Put(1l, 1l, event);
+ Event returnEvent = EventUtils.getEventFromTransactionEvent(put);
+ Assert.assertNotNull(returnEvent);
+ Assert.assertEquals(5, returnEvent.getBody().length);
+ }
+
+ @Test
+ public void testInvalidEvent() {
+ Take take = new Take(1l, 1l);
+ Event returnEvent = EventUtils.getEventFromTransactionEvent(take);
+ Assert.assertNull(returnEvent);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/91ec5794/flume-tools/src/main/java/org/apache/flume/tools/EventValidator.java
----------------------------------------------------------------------
diff --git a/flume-tools/src/main/java/org/apache/flume/tools/EventValidator.java b/flume-tools/src/main/java/org/apache/flume/tools/EventValidator.java
new file mode 100644
index 0000000..10e677d
--- /dev/null
+++ b/flume-tools/src/main/java/org/apache/flume/tools/EventValidator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.tools;
+
+import org.apache.flume.Event;
+import org.apache.flume.conf.Configurable;
+
+/**
+ * Event Validator interface to be used for validating Events
+ * per custom logic
+ */
+public interface EventValidator {
+
+ /**
+ * Validate the Event in a application specific manner
+ *
+ * @param event Flume Event
+ * @return true if Event is valid as per App Logic
+ */
+ boolean validateEvent(Event event);
+
+ EventValidator NOOP_VALIDATOR = new EventValidator() {
+ @Override
+ public boolean validateEvent(Event event) {
+ return true;
+ }
+ };
+
+ interface Builder extends Configurable {
+ EventValidator build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/91ec5794/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java
----------------------------------------------------------------------
diff --git a/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java b/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java
index 1030442..7abb7eb 100644
--- a/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java
+++ b/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java
@@ -22,15 +22,21 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.channel.file.CorruptEventException;
+import org.apache.flume.channel.file.EventUtils;
import org.apache.flume.channel.file.Log;
import org.apache.flume.channel.file.LogFile;
import org.apache.flume.channel.file.LogFileV3;
import org.apache.flume.channel.file.LogRecord;
import org.apache.flume.channel.file.Serialization;
+import org.apache.flume.channel.file.TransactionEventRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +45,8 @@ import java.io.FilenameFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
+import java.util.Set;
public class FileChannelIntegrityTool implements FlumeTool {
public static final Logger LOG = LoggerFactory.getLogger
@@ -46,6 +54,15 @@ public class FileChannelIntegrityTool implements FlumeTool {
private final List<File> dataDirs = new ArrayList<File>();
+ private EventValidator eventValidator = EventValidator.NOOP_VALIDATOR;
+
+ private long totalPutEvents;
+ private long invalidEvents;
+ private long eventsWithException;
+ private long corruptEvents;
+ private long validEvents;
+ private long totalChannelEvents;
+
@Override
public void run(String[] args) throws IOException, ParseException {
boolean shouldContinue = parseCommandLineOpts(args);
@@ -85,12 +102,39 @@ public class FileChannelIntegrityTool implements FlumeTool {
// this will throw a CorruptEventException - so the real logic
// is in the catch block below.
LogRecord record = reader.next();
+ totalChannelEvents++;
if (record != null) {
- record.getEvent();
+ TransactionEventRecord recordEvent = record.getEvent();
+ Event event = EventUtils.getEventFromTransactionEvent(recordEvent);
+ if(event != null) {
+ totalPutEvents++;
+ try {
+ if (!eventValidator.validateEvent(event)) {
+ if (!fileBackedup) {
+ Serialization.copyFile(dataFile, new File(dataFile.getParent(),
+ dataFile.getName() + ".bak"));
+ fileBackedup = true;
+ }
+ invalidEvents++;
+ updater.markRecordAsNoop(eventPosition);
+ } else {
+ validEvents++;
+ }
+ } catch (Exception e) {
+ // OOPS, didn't expected an exception
+ // considering as failure case
+ // marking as noop
+ System.err.println("Encountered Exception while validating event, marking as invalid");
+ updater.markRecordAsNoop(eventPosition);
+ eventsWithException++;
+ }
+ }
} else {
fileDone = true;
}
} catch (CorruptEventException e) {
+ corruptEvents++;
+ totalChannelEvents++;
LOG.warn("Corruption found in " + dataFile.toString() + " at "
+ eventPosition);
if (!fileBackedup) {
@@ -106,6 +150,7 @@ public class FileChannelIntegrityTool implements FlumeTool {
}
}
}
+ printSummary();
}
private boolean parseCommandLineOpts(String[] args) throws ParseException {
@@ -113,7 +158,17 @@ public class FileChannelIntegrityTool implements FlumeTool {
options
.addOption("l", "dataDirs", true, "Comma-separated list of data " +
"directories which the tool must verify. This option is mandatory")
- .addOption("h", "help", false, "Display help");
+ .addOption("h", "help", false, "Display help")
+ .addOption("e", "eventValidator", true, "Fully Qualified Name of Event Validator Implementation");;
+
+
+ Option property = OptionBuilder.withArgName("property=value")
+ .hasArgs(2)
+ .withValueSeparator()
+ .withDescription( "custom properties" )
+ .create( "D" );
+
+ options.addOption(property);
CommandLineParser parser = new GnuParser();
CommandLine commandLine = parser.parse(options, args);
@@ -137,6 +192,46 @@ public class FileChannelIntegrityTool implements FlumeTool {
dataDirs.add(f);
}
}
+
+ if(commandLine.hasOption("eventValidator")) {
+ try {
+ Class<? extends EventValidator.Builder> eventValidatorClassName =
+ (Class<? extends EventValidator.Builder>)Class.forName(
+ commandLine.getOptionValue("eventValidator"));
+ EventValidator.Builder eventValidatorBuilder = eventValidatorClassName.newInstance();
+
+ // Pass on the configuration parameter
+ Properties systemProperties = commandLine.getOptionProperties("D");
+ Context context = new Context();
+
+ Set<String> keys = systemProperties.stringPropertyNames();
+ for (String key : keys) {
+ context.put(key, systemProperties.getProperty(key));
+ }
+ eventValidatorBuilder.configure(context);
+ eventValidator = eventValidatorBuilder.build();
+ } catch (Exception e) {
+ System.err.println(String.format("Could find class %s in lib folder",
+ commandLine.getOptionValue("eventValidator")));
+ e.printStackTrace();
+ return false;
+ }
+ }
return true;
}
+
+ /**
+ * Prints the summary of run. Following information is printed
+ *
+ */
+ private void printSummary() {
+ System.out.println("---------- Summary --------------------");
+ System.out.println("Number of Events in the Channel = "+totalChannelEvents++);
+ System.out.println("Number of Put Events Processed = "+totalPutEvents);
+ System.out.println("Number of Valid Put Events = "+validEvents);
+ System.out.println("Number of Invalid Put Events = "+invalidEvents);
+ System.out.println("Number of Put Events that threw Exception during validation = "+eventsWithException);
+ System.out.println("Number of Corrupt Events = "+corruptEvents);
+ System.out.println("---------------------------------------");
+ }
}
http://git-wip-us.apache.org/repos/asf/flume/blob/91ec5794/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java
----------------------------------------------------------------------
diff --git a/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java b/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java
index f24ae56..ac4dac4 100644
--- a/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java
+++ b/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java
@@ -58,6 +58,7 @@ public class TestFileChannelIntegrityTool {
private File checkpointDir;
private File dataDir;
+ private static int invalidEvent = 0;
@BeforeClass
public static void setUpClass() throws Exception{
@@ -97,6 +98,45 @@ public class TestFileChannelIntegrityTool {
}
@Test
+ public void testFixInvalidRecords() throws Exception {
+ doTestFixInvalidEvents(false, DummyEventVerifier.Builder.class.getName());
+ }
+ @Test
+ public void testFixInvalidRecordsWithCheckpoint() throws Exception {
+ doTestFixInvalidEvents(true, DummyEventVerifier.Builder.class.getName());
+ }
+
+ public void doTestFixInvalidEvents(boolean withCheckpoint, String eventHandler) throws Exception {
+ FileChannelIntegrityTool tool = new FileChannelIntegrityTool();
+ tool.run(new String[] {"-l", dataDir.toString(), "-e", eventHandler, "-DvalidatorValue=0"});
+ FileChannel channel = new FileChannel();
+ channel.setName("channel");
+ String cp;
+ if(withCheckpoint) {
+ cp = origCheckpointDir.toString();
+ } else {
+ FileUtils.deleteDirectory(checkpointDir);
+ Assert.assertTrue(checkpointDir.mkdirs());
+ cp = checkpointDir.toString();
+ }
+ ctx.put(FileChannelConfiguration.CHECKPOINT_DIR,cp);
+ ctx.put(FileChannelConfiguration.DATA_DIRS, dataDir.toString());
+ channel.configure(ctx);
+ channel.start();
+ Transaction tx = channel.getTransaction();
+ tx.begin();
+ int i = 0;
+ while(channel.take() != null) {
+ i++;
+ }
+ tx.commit();
+ tx.close();
+ channel.stop();
+ Assert.assertTrue(invalidEvent != 0);
+ Assert.assertEquals(25 - invalidEvent, i);
+ }
+
+ @Test
public void testFixCorruptRecords() throws Exception {
doTestFixCorruptEvents(false);
}
@@ -226,6 +266,12 @@ public class TestFileChannelIntegrityTool {
Transaction tx = channel.getTransaction();
tx.begin();
for (int i = 0; i < 5; i++) {
+ if(i % 3 == 0) {
+ event.getBody()[0] = 0;
+ invalidEvent++;
+ } else {
+ event.getBody()[0] = 1;
+ }
channel.put(event);
}
tx.commit();
@@ -244,4 +290,33 @@ public class TestFileChannelIntegrityTool {
.invoke(true));
channel.stop();
}
+
+ public static class DummyEventVerifier implements EventValidator {
+
+ private int value = 0;
+
+ private DummyEventVerifier(int val) {
+ value = val;
+ }
+
+ @Override
+ public boolean validateEvent(Event event) {
+ return event.getBody()[0] != value;
+ }
+
+ public static class Builder implements EventValidator.Builder {
+
+ private int binaryValidator = 0;
+
+ @Override
+ public EventValidator build() {
+ return new DummyEventVerifier(binaryValidator);
+ }
+
+ @Override
+ public void configure(Context context) {
+ binaryValidator = context.getInteger("validatorValue");
+ }
+ }
+ }
}