You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ro...@apache.org on 2014/04/24 23:42:26 UTC
[1/2] OOZIE-1737 Oozie log streaming is slow (puru via rohini)
Repository: oozie
Updated Branches:
refs/heads/master 2ef171156 -> f9176a01b
http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/test/java/org/apache/oozie/util/TestTimestampedMessageParser.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/util/TestTimestampedMessageParser.java b/core/src/test/java/org/apache/oozie/util/TestTimestampedMessageParser.java
index c2630e7..f9f1dd4 100644
--- a/core/src/test/java/org/apache/oozie/util/TestTimestampedMessageParser.java
+++ b/core/src/test/java/org/apache/oozie/util/TestTimestampedMessageParser.java
@@ -25,6 +25,8 @@ import java.io.FileWriter;
import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
+
+import org.apache.oozie.command.CommandException;
import org.apache.oozie.test.XTestCase;
public class TestTimestampedMessageParser extends XTestCase {
@@ -113,16 +115,16 @@ public class TestTimestampedMessageParser extends XTestCase {
return file;
}
- public void testNofindLogs() {
+ public void testNofindLogs() throws CommandException {
// Test of OOZIE-1691
- XLogStreamer.Filter.reset();
- XLogStreamer.Filter.defineParameter("USER");
- XLogStreamer.Filter.defineParameter("GROUP");
- XLogStreamer.Filter.defineParameter("TOKEN");
- XLogStreamer.Filter.defineParameter("APP");
- XLogStreamer.Filter.defineParameter("JOB");
- XLogStreamer.Filter.defineParameter("ACTION");
- XLogStreamer.Filter xf = new XLogStreamer.Filter();
+ XLogFilter.reset();
+ XLogFilter.defineParameter("USER");
+ XLogFilter.defineParameter("GROUP");
+ XLogFilter.defineParameter("TOKEN");
+ XLogFilter.defineParameter("APP");
+ XLogFilter.defineParameter("JOB");
+ XLogFilter.defineParameter("ACTION");
+ XLogFilter xf = new XLogFilter();
xf.setParameter("JOB", "14-200904160239--no-found-C");
xf.setLogLevel("DEBUG|WARN");
try {
@@ -136,18 +138,18 @@ public class TestTimestampedMessageParser extends XTestCase {
}
}
- public void testProcessRemainingLog() throws IOException {
- XLogStreamer.Filter.reset();
- XLogStreamer.Filter.defineParameter("USER");
- XLogStreamer.Filter.defineParameter("GROUP");
- XLogStreamer.Filter.defineParameter("TOKEN");
- XLogStreamer.Filter.defineParameter("APP");
- XLogStreamer.Filter.defineParameter("JOB");
- XLogStreamer.Filter.defineParameter("ACTION");
- XLogStreamer.Filter xf = new XLogStreamer.Filter();
+ public void testProcessRemainingLog() throws IOException, CommandException {
+
+ XLogFilter.reset();
+ XLogFilter.defineParameter("USER");
+ XLogFilter.defineParameter("GROUP");
+ XLogFilter.defineParameter("TOKEN");
+ XLogFilter.defineParameter("APP");
+ XLogFilter.defineParameter("JOB");
+ XLogFilter.defineParameter("ACTION");
+ XLogFilter xf = new XLogFilter();
xf.setParameter("JOB", "14-200904160239--example-forkjoinwf");
xf.setLogLevel("DEBUG|WARN");
-
File file = prepareFile1(getTestCaseDir());
StringWriter sw = new StringWriter();
new TimestampedMessageParser(new BufferedReader(new FileReader(file)), xf).processRemaining(sw, 4096);
@@ -169,15 +171,15 @@ public class TestTimestampedMessageParser extends XTestCase {
assertTrue(out[13].contains("_L17_"));
}
- public void testProcessRemainingCoordinatorLogForActions() throws IOException {
- XLogStreamer.Filter.reset();
- XLogStreamer.Filter.defineParameter("USER");
- XLogStreamer.Filter.defineParameter("GROUP");
- XLogStreamer.Filter.defineParameter("TOKEN");
- XLogStreamer.Filter.defineParameter("APP");
- XLogStreamer.Filter.defineParameter("JOB");
- XLogStreamer.Filter.defineParameter("ACTION");
- XLogStreamer.Filter xf = new XLogStreamer.Filter();
+ public void testProcessRemainingCoordinatorLogForActions() throws IOException, CommandException {
+ XLogFilter.reset();
+ XLogFilter.defineParameter("USER");
+ XLogFilter.defineParameter("GROUP");
+ XLogFilter.defineParameter("TOKEN");
+ XLogFilter.defineParameter("APP");
+ XLogFilter.defineParameter("JOB");
+ XLogFilter.defineParameter("ACTION");
+ XLogFilter xf = new XLogFilter();
xf.setParameter("JOB", "14-200904160239--example-C");
xf.setParameter("ACTION", "14-200904160239--example-C@1");
@@ -191,8 +193,8 @@ public class TestTimestampedMessageParser extends XTestCase {
}
public void testLifecycle() throws Exception {
- XLogStreamer.Filter.reset();
- XLogStreamer.Filter xf = new XLogStreamer.Filter();
+ XLogFilter.reset();
+ XLogFilter xf = new XLogFilter();
String str1 = "2009-06-24 02:43:13,958 DEBUG _L1_:323 - USER[oozie] GROUP[-] TOKEN[-] APP[example-forkjoinwf] "
+ "JOB[14-200904160239--example-forkjoinwf] ACTION[-] End workflow state change\n";
String str2 = "2009-06-24 02:43:13,961 INFO _L2_:317 - USER[-] GROUP[-] TOKEN[-] APP[example-forkjoinwf] "
http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/test/java/org/apache/oozie/util/TestXLogFilter.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/util/TestXLogFilter.java b/core/src/test/java/org/apache/oozie/util/TestXLogFilter.java
index 8d0fd71..5301784 100644
--- a/core/src/test/java/org/apache/oozie/util/TestXLogFilter.java
+++ b/core/src/test/java/org/apache/oozie/util/TestXLogFilter.java
@@ -17,20 +17,20 @@
*/
package org.apache.oozie.util;
+import org.apache.oozie.command.CommandException;
import org.apache.oozie.service.ServiceException;
import org.apache.oozie.service.Services;
-import org.apache.oozie.util.XLogStreamer;
import java.util.ArrayList;
import org.apache.oozie.test.XTestCase;
public class TestXLogFilter extends XTestCase {
- public void testXLogFileter() throws ServiceException {
+ public void testXLogFilter() throws ServiceException, CommandException {
Services services = new Services();
services.init();
try {
- XLogStreamer.Filter xf2 = new XLogStreamer.Filter();
+ XLogFilter xf2 = new XLogFilter();
xf2.constructPattern();
ArrayList<String> a = new ArrayList<String>();
a.add("2009-06-24 02:43:13,958");
@@ -42,14 +42,15 @@ public class TestXLogFilter extends XTestCase {
services.destroy();
}
- XLogStreamer.Filter.reset();
- XLogStreamer.Filter.defineParameter("USER");
- XLogStreamer.Filter.defineParameter("GROUP");
- XLogStreamer.Filter.defineParameter("TOKEN");
- XLogStreamer.Filter.defineParameter("APP");
- XLogStreamer.Filter.defineParameter("JOB");
- XLogStreamer.Filter.defineParameter("ACTION");
- XLogStreamer.Filter xf = new XLogStreamer.Filter();
+ XLogFilter.reset();
+ XLogFilter.defineParameter("USER");
+ XLogFilter.defineParameter("GROUP");
+ XLogFilter.defineParameter("TOKEN");
+ XLogFilter.defineParameter("APP");
+ XLogFilter.defineParameter("JOB");
+ XLogFilter.defineParameter("ACTION");
+ XLogFilter xf = new XLogFilter();
+
assertEquals(7, matches(xf));
xf.setLogLevel(XLog.Level.WARN.toString());
@@ -63,7 +64,7 @@ public class TestXLogFilter extends XTestCase {
xf.setParameter("JOB", "14-200904160239--example-forkjoinwf");
assertEquals(2, matches(xf));
- XLogStreamer.Filter xf1 = new XLogStreamer.Filter();
+ XLogFilter xf1 = new XLogFilter();
xf1.setParameter("USER", "oozie");
assertEquals(3, matches(xf1));
@@ -74,7 +75,7 @@ public class TestXLogFilter extends XTestCase {
assertEquals(1, matches(xf1));
}
- private int matches(XLogStreamer.Filter xf) {
+ private int matches(XLogFilter xf) {
xf.constructPattern();
ArrayList<String> a = new ArrayList<String>();
a.add("2009-06-24 02:43:13,958 DEBUG WorkflowRunnerCallable:323 - USER[oozie] GROUP[-] TOKEN[-] APP[example-forkjoinwf] JOB[14-200904160239--example-forkjoinwf] ACTION[-] End workflow state change");
http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/test/java/org/apache/oozie/util/TestXLogUserFilterParam.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/util/TestXLogUserFilterParam.java b/core/src/test/java/org/apache/oozie/util/TestXLogUserFilterParam.java
new file mode 100644
index 0000000..c769883
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/util/TestXLogUserFilterParam.java
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.util;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.oozie.client.rest.RestConstants;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.XLogService;
+import org.apache.oozie.service.XLogStreamingService;
+import org.apache.oozie.test.XTestCase;
+import org.apache.oozie.util.XLogFilter;
+import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.XLogUserFilterParam;
+
+public class TestXLogUserFilterParam extends XTestCase {
+
+ final static SimpleDateFormat dt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+ public void setLogFile() throws Exception {
+ XLogFilter.reset();
+ File log4jFile = new File(getTestCaseConfDir(), "test-log4j.properties");
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ InputStream is = cl.getResourceAsStream("test-no-dash-log4j.properties");
+
+ InputStream errorLogStream = cl.getResourceAsStream("userLogFilterTestlog.log");
+
+ Properties log4jProps = new Properties();
+ log4jProps.load(is);
+ // prevent conflicts with other tests by changing the log file location
+ log4jProps.setProperty("log4j.appender.oozie.File", getTestCaseDir() + "/oozie.log");
+ log4jProps.store(new FileOutputStream(log4jFile), "");
+ setSystemProperty(XLogService.LOG4J_FILE, log4jFile.getName());
+
+ new Services().init();
+ File logFile = new File(Services.get().get(XLogService.class).getOozieLogPath(), Services.get()
+ .get(XLogService.class).getOozieLogName());
+ IOUtils.copyStream(errorLogStream, new FileOutputStream(logFile));
+ }
+
+ public void testloglevel_Error() throws Exception {
+ setLogFile();
+ String logLevel = XLogUserFilterParam.LOG_LEVEL + "=ERROR";
+ Map<String, String[]> paramMap = new HashMap<String, String[]>();
+ paramMap.put(RestConstants.LOG_FILTER_OPTION, new String[] { logLevel });
+
+ XLogFilter xf = new XLogFilter(new XLogUserFilterParam(paramMap));
+ String out = doStreamLog(xf);
+ int count = 0;
+ for (String line : out.split(System.getProperty("line.separator"))) {
+ assertFalse(line.contains("DEBUG"));
+ assertFalse(line.contains("INFO"));
+ count++;
+ }
+ assertEquals(count, 20);
+ }
+
+ // Test multiple log level
+ public void testloglevel_DEBUF_INFO() throws Exception {
+ setLogFile();
+
+ String logLevel = XLogUserFilterParam.LOG_LEVEL + "=DEBUG|INFO";
+ Map<String, String[]> paramMap = new HashMap<String, String[]>();
+ paramMap.put(RestConstants.LOG_FILTER_OPTION, new String[] { logLevel });
+ XLogFilter xf = new XLogFilter(new XLogUserFilterParam(paramMap));
+ String out = doStreamLog(xf);
+
+ String lines[] = out.split(System.getProperty("line.separator"));
+
+ assertEquals(lines.length, 4);
+
+ assertTrue(lines[1]
+ .startsWith("2014-02-27 02:06:49,215 DEBUG CoordActionStartXCommand:545 [pool-2-thread-236] - USER[-] GROUP[-] "
+ + "TOKEN[-] APP[-]"));
+ assertTrue(lines[2]
+ .startsWith("2014-02-27 02:06:49,215 DEBUG CoordActionStartXCommand:545 [pool-2-thread-236] - USER[-] GROUP[-] "
+ + "TOKEN[-] APP[-] JOB[0601839-140127221758655-oozie-wrkf-C]"));
+ assertTrue(lines[3]
+ .startsWith("2014-02-27 02:06:49,215 INFO CoordActionStartXCommand:545 [pool-2-thread-236] - USER[-] GROUP[-] "
+ + "TOKEN[-] APP[-] JOB[0601839-140127221758655-oozie-wrkf-C]"));
+
+ }
+
+ // test log level with length
+ public void testloglevel_ErrorWithLen() throws Exception {
+ setLogFile();
+ String logLevel = XLogUserFilterParam.LOG_LEVEL + "=ERROR;" + XLogUserFilterParam.LIMIT + "=1";
+ Map<String, String[]> paramMap = new HashMap<String, String[]>();
+ paramMap.put(RestConstants.LOG_FILTER_OPTION, new String[] { logLevel });
+
+ XLogFilter xf = new XLogFilter(new XLogUserFilterParam(paramMap));
+ String out = doStreamLog(xf);
+ String lines[] = out.split(System.getProperty("line.separator"));
+
+ assertEquals(lines.length, 1);
+ }
+
+ // test length
+ public void testLength() throws Exception {
+ setLogFile();
+
+ String logLevel = XLogUserFilterParam.LIMIT + "=1";
+ Map<String, String[]> paramMap = new HashMap<String, String[]>();
+ paramMap.put(RestConstants.LOG_FILTER_OPTION, new String[] { logLevel });
+ XLogFilter xf = new XLogFilter(new XLogUserFilterParam(paramMap));
+ String out = doStreamLog(xf);
+ String lines[] = out.split(System.getProperty("line.separator"));
+
+ assertEquals(lines.length, 1);
+ }
+
+ // Test text search
+ public void testTextSearch() throws Exception {
+ setLogFile();
+ XLogFilter filter = new XLogFilter();
+
+ String param = XLogUserFilterParam.SEARCH_TEXT + "=substitution;" + XLogUserFilterParam.LIMIT + "=2";
+ Map<String, String[]> paramMap = new HashMap<String, String[]>();
+ paramMap.put(RestConstants.LOG_FILTER_OPTION, new String[] { param });
+
+ XLogUserFilterParam logUtil = new XLogUserFilterParam(paramMap);
+ filter.setUserLogFilter(logUtil);
+ String out = doStreamLog(filter);
+ String lines[] = out.split(System.getProperty("line.separator"));
+ assertEquals(lines.length, 2);
+
+ assertTrue(lines[0].contains("E0803: IO error, Variable substitution depth too large: 20 ${dniInputDir}"));
+ assertTrue(lines[1].contains("E0803: IO error, Variable substitution depth too large: 20 ${dniInputDir}"));
+
+ }
+
+ // Test text search with log level
+ public void testsearchText_logLevel() throws Exception {
+ setLogFile();
+ String param = XLogUserFilterParam.SEARCH_TEXT + "=substitution;" + XLogUserFilterParam.LOG_LEVEL + "=DEBUG";
+ Map<String, String[]> paramMap = new HashMap<String, String[]>();
+ paramMap.put(RestConstants.LOG_FILTER_OPTION, new String[] { param });
+
+ XLogFilter filter = new XLogFilter(new XLogUserFilterParam(paramMap));
+ String out = doStreamLog(filter);
+ String lines[] = out.split(System.getProperty("line.separator"));
+ assertEquals(lines.length, 1);
+
+ assertTrue(lines[0]
+ .contains("2014-02-27 02:06:47,499 DEBUG CoordActionStartXCommand:536 [pool-2-thread-236] - USER[-]"));
+ assertTrue(lines[0].contains("E0803: IO error, Variable substitution depth too large: 20 ${dniInputDir}"));
+ }
+
+ // Test logduration, out of range
+ public void testException() throws Exception {
+ File log4jFile = new File(getTestCaseConfDir(), "test-log4j.properties");
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ InputStream is = cl.getResourceAsStream("test-no-dash-log4j.properties");
+
+ Properties log4jProps = new Properties();
+ log4jProps.load(is);
+ // prevent conflicts with other tests by changing the log file location
+ log4jProps.setProperty("log4j.appender.oozie.File", getTestCaseDir() + "/oozie.log");
+ log4jProps.store(new FileOutputStream(log4jFile), "");
+ setSystemProperty(XLogService.LOG4J_FILE, log4jFile.getName());
+
+ new Services().init();
+ Services.get().getConf().setInt(XLogFilter.MAX_SCAN_DURATION, 10);
+ Map<String, String[]> paramMap = new HashMap<String, String[]>();
+ paramMap.put(RestConstants.LOG_FILTER_OPTION, new String[] {});
+ XLogFilter filter = new XLogFilter(new XLogUserFilterParam(paramMap));
+ Date startDate = new Date();
+ Date endDate = new Date(startDate.getTime() + 60 * 60 * 1000 * 11);
+
+ try {
+ doStreamLog(filter, startDate, endDate);
+ fail("should not come here");
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ assertTrue(e.getMessage().contains(
+ "Request log streaming time range is higher than configured."));
+ }
+ }
+
+ // Test log duration - in range
+ public void testNoException_Withrecent() throws Exception {
+ File log4jFile = new File(getTestCaseConfDir(), "test-log4j.properties");
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ InputStream is = cl.getResourceAsStream("test-no-dash-log4j.properties");
+
+ Properties log4jProps = new Properties();
+ log4jProps.load(is);
+ // prevent conflicts with other tests by changing the log file location
+ log4jProps.setProperty("log4j.appender.oozie.File", getTestCaseDir() + "/oozie.log");
+ log4jProps.store(new FileOutputStream(log4jFile), "");
+ setSystemProperty(XLogService.LOG4J_FILE, log4jFile.getName());
+
+ new Services().init();
+ Services.get().getConf().setInt(XLogFilter.MAX_SCAN_DURATION, 10);
+ Map<String, String[]> paramMap = new HashMap<String, String[]>();
+ String param = XLogUserFilterParam.RECENT_LOG_OFFSET + "=9";
+
+ paramMap.put(RestConstants.LOG_FILTER_OPTION, new String[] { param });
+ XLogFilter filter = new XLogFilter(new XLogUserFilterParam(paramMap));
+ Date startDate = new Date();
+ Date endDate = new Date(startDate.getTime() + 60 * 60 * 1000 * 15);
+
+ try {
+ doStreamLog(filter, startDate, endDate);
+ }
+ catch (Exception e) {
+ fail("should not throw exception");
+ }
+
+ }
+
+ // Test multiple combination
+ public void testCombination() throws Exception {
+ File log4jFile = new File(getTestCaseConfDir(), "test-log4j.properties");
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ InputStream is = cl.getResourceAsStream("test-no-dash-log4j.properties");
+
+ Properties log4jProps = new Properties();
+ log4jProps.load(is);
+ log4jProps.setProperty("log4j.appender.oozie.File", getTestCaseDir() + "/oozie.log");
+ log4jProps.store(new FileOutputStream(log4jFile), "");
+ setSystemProperty(XLogService.LOG4J_FILE, log4jFile.getName());
+
+ new Services().init();
+
+ XLogFilter.reset();
+
+ Map<String, String[]> paramMap = new HashMap<String, String[]>();
+ String param = "start=14-02-20 02:06:25,499;end=14-02-27 02:06:47,550;debug;loglevel=ERROR|WARN;recent=3m";
+ paramMap.put(RestConstants.LOG_FILTER_OPTION, new String[] { param });
+ XLogFilter filter = new XLogFilter(new XLogUserFilterParam(paramMap));
+ // Param date will be overwritten by user param
+ String out = doStreamLog(filter, new Date(), new Date());
+ assertEquals(out.split(System.getProperty("line.separator")).length, 1);
+ assertTrue(out.split(System.getProperty("line.separator"))[0].contains("Log start time = Tue Feb 27 02:03:47"));
+ assertTrue(out.split(System.getProperty("line.separator"))[0].contains("Log end time = Tue Feb 27 02:06:47"));
+
+ }
+
+ // Test start and end date, start as absolute
+ public void testStartEnd_EndOffset() throws Exception {
+ File log4jFile = new File(getTestCaseConfDir(), "test-log4j.properties");
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ InputStream is = cl.getResourceAsStream("test-no-dash-log4j.properties");
+
+ Properties log4jProps = new Properties();
+ log4jProps.load(is);
+ // prevent conflicts with other tests by changing the log file location
+ log4jProps.setProperty("log4j.appender.oozie.File", getTestCaseDir() + "/oozie.log");
+ log4jProps.store(new FileOutputStream(log4jFile), "");
+ setSystemProperty(XLogService.LOG4J_FILE, log4jFile.getName());
+
+ new Services().init();
+ XLogFilter.reset();
+ Map<String, String[]> paramMap = new HashMap<String, String[]>();
+ String param = "start=14-02-20 02:06:25,499;end=3m;debug";
+
+ paramMap.put(RestConstants.LOG_FILTER_OPTION, new String[] { param });
+
+ XLogFilter filter = new XLogFilter(new XLogUserFilterParam(paramMap));
+ // Param date will be overwritten by user param
+ String out = doStreamLog(filter, dt.parse("14-02-20 02:06:25,499"), new Date());
+ assertEquals(out.split(System.getProperty("line.separator")).length, 1);
+ assertTrue(out.split(System.getProperty("line.separator"))[0].contains("Log start time = Tue Feb 20 02:06:25"));
+ assertTrue(out.split(System.getProperty("line.separator"))[0].contains("Log end time = Tue Feb 20 02:11:25"));
+
+ }
+
+ // Test start and end date, both offset
+ public void testStartEnd_bothOffset() throws Exception {
+ File log4jFile = new File(getTestCaseConfDir(), "test-log4j.properties");
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ InputStream is = cl.getResourceAsStream("test-no-dash-log4j.properties");
+
+ Properties log4jProps = new Properties();
+ log4jProps.load(is);
+ // prevent conflicts with other tests by changing the log file location
+ log4jProps.setProperty("log4j.appender.oozie.File", getTestCaseDir() + "/oozie.log");
+ log4jProps.store(new FileOutputStream(log4jFile), "");
+ setSystemProperty(XLogService.LOG4J_FILE, log4jFile.getName());
+ new Services().init();
+ Map<String, String[]> paramMap = new HashMap<String, String[]>();
+ String param = "start=3m;end=13m;debug";
+
+ paramMap.put(RestConstants.LOG_FILTER_OPTION, new String[] { param });
+
+ XLogFilter filter = new XLogFilter(new XLogUserFilterParam(paramMap));
+ // Param date will be overwritten by user param
+ String out = doStreamLog(filter, dt.parse("14-02-20 02:06:25,499"), new Date());
+ assertEquals(out.split(System.getProperty("line.separator")).length, 1);
+ assertTrue(out.split(System.getProperty("line.separator"))[0].contains("Log start time = Tue Feb 20 02:07:25"));
+ assertTrue(out.split(System.getProperty("line.separator"))[0].contains("Log end time = Tue Feb 20 02:21:25"));
+
+ }
+
+ // Test start and end date, both absolute
+ public void testStartEnd_bothabsoulte() throws Exception {
+ File log4jFile = new File(getTestCaseConfDir(), "test-log4j.properties");
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ InputStream is = cl.getResourceAsStream("test-no-dash-log4j.properties");
+
+ Properties log4jProps = new Properties();
+ log4jProps.load(is);
+ // prevent conflicts with other tests by changing the log file location
+ log4jProps.setProperty("log4j.appender.oozie.File", getTestCaseDir() + "/oozie.log");
+ log4jProps.store(new FileOutputStream(log4jFile), "");
+ setSystemProperty(XLogService.LOG4J_FILE, log4jFile.getName());
+ new Services().init();
+
+ Map<String, String[]> paramMap = new HashMap<String, String[]>();
+ String param = "start=14-03-20 02:06:25,499;end=14-03-20 02:10:25,499;debug";
+ paramMap.put(RestConstants.LOG_FILTER_OPTION, new String[] { param });
+ XLogFilter filter = new XLogFilter(new XLogUserFilterParam(paramMap));
+ // Param date will be overwritten by user param
+ String out = doStreamLog(filter, dt.parse("14-01-20 02:06:25,499"), dt.parse("14-02-20 02:06:25,499"));
+ assertEquals(out.split(System.getProperty("line.separator")).length, 1);
+ assertTrue(out.split(System.getProperty("line.separator"))[0].contains("Log start time = Tue Mar 20 02:06:25"));
+ assertTrue(out.split(System.getProperty("line.separator"))[0].contains("Log end time = Tue Mar 20 02:10:25"));
+
+ paramMap = new HashMap<String, String[]>();
+ param = "start=14-03-20 02:06:25;end=14-03-20 02:10:25;debug";
+ paramMap.put(RestConstants.LOG_FILTER_OPTION, new String[] { param });
+ filter = new XLogFilter(new XLogUserFilterParam(paramMap));
+ // Param date will be overwritten by user param
+ out = doStreamLog(filter, dt.parse("14-01-20 02:06:25,499"), dt.parse("14-02-20 02:06:25,499"));
+ assertEquals(out.split(System.getProperty("line.separator")).length, 1);
+ assertTrue(out.split(System.getProperty("line.separator"))[0].contains("Log start time = Tue Mar 20 02:06:25"));
+ assertTrue(out.split(System.getProperty("line.separator"))[0].contains("Log end time = Tue Mar 20 02:10:25"));
+
+
+ }
+
+ // Test start and end date, both absolute
+ public void testStartEnd_startAbsolute() throws Exception {
+ File log4jFile = new File(getTestCaseConfDir(), "test-log4j.properties");
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ InputStream is = cl.getResourceAsStream("test-no-dash-log4j.properties");
+
+ Properties log4jProps = new Properties();
+ log4jProps.load(is);
+ // prevent conflicts with other tests by changing the log file location
+ log4jProps.setProperty("log4j.appender.oozie.File", getTestCaseDir() + "/oozie.log");
+ log4jProps.store(new FileOutputStream(log4jFile), "");
+ setSystemProperty(XLogService.LOG4J_FILE, log4jFile.getName());
+ new Services().init();
+ Map<String, String[]> paramMap = new HashMap<String, String[]>();
+ String param = "start=14-03-20 02:06:25,499;end=4m;debug";
+ paramMap.put(RestConstants.LOG_FILTER_OPTION, new String[] { param });
+ XLogFilter filter = new XLogFilter(new XLogUserFilterParam(paramMap));
+ // Param date will be overwritten by user param
+ String out = doStreamLog(filter, dt.parse("14-01-20 02:06:25,499"), dt.parse("14-02-20 02:06:25,499"));
+ assertEquals(out.split(System.getProperty("line.separator")).length, 1);
+ assertTrue(out.split(System.getProperty("line.separator"))[0].contains("Log start time = Tue Mar 20 02:06:25"));
+ assertTrue(out.split(System.getProperty("line.separator"))[0].contains("Log end time = Tue Mar 20 02:12:25"));
+
+ }
+
+ private String doStreamLog(XLogFilter xf) throws Exception {
+ return doStreamLog(xf, null, null);
+ }
+
+ private String doStreamLog(XLogFilter xf, Date startDate, Date endDate) throws Exception {
+ StringWriter w = new StringWriter();
+ Services.get().get(XLogStreamingService.class)
+ .streamLog(xf, startDate, endDate, w, new HashMap<String, String[]>());
+ return w.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/test/resources/userLogFilterTestlog.log
----------------------------------------------------------------------
diff --git a/core/src/test/resources/userLogFilterTestlog.log b/core/src/test/resources/userLogFilterTestlog.log
new file mode 100644
index 0000000..bf89038
--- /dev/null
+++ b/core/src/test/resources/userLogFilterTestlog.log
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+2014-02-27 02:06:47,499 DEBUG CoordActionStartXCommand:536 [pool-2-thread-236] - USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0601839-140127221758655-oozie-wrkf-C] ACTION[0601839-140127221758655-oozie-wrkf-C@2] Failing the action 0601839-140127221758655-oozie-wrkf-C@2. Because E0803 : E0803: IO error, Variable substitution depth too large: 20 ${dniInputDir}
+2014-02-27 02:06:47,499 ERROR CoordActionStartXCommand:536 [pool-2-thread-236] - USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0601839-140127221758656-oozie-wrkf-C] ACTION[0601839-140127221758655-oozie-wrkf-C@2] Failing the action 0601839-140127221758655-oozie-wrkf-C@2. Because E0803 : E0803: IO error, Variable substitution depth too large: 20 ${dniInputDir}
+2014-02-27 02:06:47,499 ERROR SubmitXCommand:536 [pool-2-thread-236] - USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0601839-140127221758655-oozie-wrkf-C] ACTION[0601839-140127221758655-oozie-wrkf-C@2] XException,
+org.apache.oozie.command.CommandException: E0803: IO error, Variable substitution depth too large: 20 ${dniInputDir}
+ at org.apache.oozie.command.wf.SubmitXCommand.execute(SubmitXCommand.java:252)
+ at org.apache.oozie.command.wf.SubmitXCommand.execute(SubmitXCommand.java:72)
+ at org.apache.oozie.command.XCommand.call(XCommand.java:280)
+ at org.apache.oozie.DagEngine.submitJobFromCoordinator(DagEngine.java:131)
+ at org.apache.oozie.command.coord.CoordActionStartXCommand.execute(CoordActionStartXCommand.java:182)
+ at org.apache.oozie.command.coord.CoordActionStartXCommand.execute(CoordActionStartXCommand.java:63)
+ at org.apache.oozie.command.XCommand.call(XCommand.java:280)
+ at org.apache.oozie.service.CallableQueueService$CallableWrapper.run(CallableQueueService.java:175)
+ at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
+ at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
+ at java.lang.Thread.run(Thread.java:722)
+Caused by: java.lang.IllegalStateException: Variable substitution depth too large: 20 ${dniInputDir}
+ at org.apache.oozie.util.XConfiguration.substituteVars(XConfiguration.java:172)
+ at org.apache.oozie.util.XConfiguration.get(XConfiguration.java:120)
+ at org.apache.oozie.command.wf.SubmitXCommand.execute(SubmitXCommand.java:171)
+ ... 10 more
+2014-02-27 02:06:47,499 WARN CoordActionStartXCommand:542 [pool-2-thread-236] - USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0601839-140127221758655-oozie-wrkf-C] ACTION[0601839-140127221758655-oozie-wrkf-C@2] can not create DagEngine for submitting jobs
+org.apache.oozie.DagEngineException: E0803: IO error, Variable substitution depth too large: 20 ${dniInputDir}
+ at org.apache.oozie.DagEngine.submitJobFromCoordinator(DagEngine.java:136)
+ at org.apache.oozie.command.coord.CoordActionStartXCommand.execute(CoordActionStartXCommand.java:182)
+ at org.apache.oozie.command.coord.CoordActionStartXCommand.execute(CoordActionStartXCommand.java:63)
+ at org.apache.oozie.command.XCommand.call(XCommand.java:280)
+ at org.apache.oozie.service.CallableQueueService$CallableWrapper.run(CallableQueueService.java:175)
+ at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
+ at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
+ at java.lang.Thread.run(Thread.java:722)
+Caused by: org.apache.oozie.command.CommandException: E0803: IO error, Variable substitution depth too large: 20 ${dniInputDir}
+ at org.apache.oozie.command.wf.SubmitXCommand.execute(SubmitXCommand.java:252)
+ at org.apache.oozie.command.wf.SubmitXCommand.execute(SubmitXCommand.java:72)
+ at org.apache.oozie.command.XCommand.call(XCommand.java:280)
+ at org.apache.oozie.DagEngine.submitJobFromCoordinator(DagEngine.java:131)
+ ... 7 more
+Caused by: java.lang.IllegalStateException: Variable substitution depth too large: 20 ${dniInputDir}
+ at org.apache.oozie.util.XConfiguration.substituteVars(XConfiguration.java:172)
+ at org.apache.oozie.util.XConfiguration.get(XConfiguration.java:120)
+ at org.apache.oozie.command.wf.SubmitXCommand.execute(SubmitXCommand.java:171)
+ ... 10 more
+2014-02-27 02:06:47,499 ERROR CoordActionStartXCommand:536 [pool-2-thread-236] - USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0601839-140127221758655-oozie-wrkf-C] ACTION[0601839-140127221758655-oozie-wrkf-C@2] Failing the action 0601839-140127221758655-oozie-wrkf-C@2. Because E0803 : E0803: IO error, Variable substitution depth too large: 20 ${dniInputDir}
+2014-02-27 02:06:49,215 DEBUG CoordActionStartXCommand:545 [pool-2-thread-236] - USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0601839-140127221758655-oozie-wrkf-C] ACTION[0601839-140127221758655-oozie-wrkf-C@2] Queuing [1] commands with delay [0]ms
+2014-02-27 02:06:49,215 DEBUG CoordActionStartXCommand:545 [pool-2-thread-236] - USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0601839-140127221758655-oozie-wrkf-C] ACTION[0601839-140127221758655-oozie-wrkf-C@2] Released lock for [0601839-140127221758655-oozie-wrkf-C] in [coord_action_start]
+2014-02-27 02:06:49,215 INFO CoordActionStartXCommand:545 [pool-2-thread-236] - USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0601839-140127221758655-oozie-wrkf-C] ACTION[0601839-140127221758655-oozie-wrkf-C@2] Released lock for [0601839-140127221758655-oozie-wrkf-C] in [coord_action_start]
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/docs/src/site/twiki/DG_CommandLineTool.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/DG_CommandLineTool.twiki b/docs/src/site/twiki/DG_CommandLineTool.twiki
index 222389e..f3231c7 100644
--- a/docs/src/site/twiki/DG_CommandLineTool.twiki
+++ b/docs/src/site/twiki/DG_CommandLineTool.twiki
@@ -67,6 +67,8 @@ usage:
coordinator job; new pausetime value for changing a bundle job
-verbose verbose mode
-update Update coordinator definition and properties
+ -logfilter job log search parameter. Can be specified as -logfilter opt1=val1;opt2=val1;opt3=val1.
+ Supported options are recent, start, end, loglevel, text, limit and debug.
.
oozie jobs <OPTIONS> : jobs status
-auth <arg> select authentication type [SIMPLE|KERBEROS]
@@ -558,6 +560,71 @@ $ oozie job -log <coord_job_id> [-action 1, 3-4, 7-40] (-action is optional.)
</verbatim>
+---+++ Filtering the server logs with logfilter options
+
+User can provide multiple option to filter logs using -logfilter opt1=val1;opt2=val1;opt3=val1. This can be used to fetch only just logs of interest faster as fetching Oozie server logs is slow due to the overhead of pattern matching.
+Available options are:
+ * recent: Specify recent hours/min of logs to scan. The recent offset specified is taken relative to the =end= time specified, job end time or the current system time if the job is still running in that order of precedence. For eg: recent=3h or recent=30m will fetch logs starting 3 hours/30 minutes before the end time and up to the end time. H/h is used to denote hour and M/m is used to denote minutes. If no suffix is specified default is hours.
+ * start: Start time for scanning logs. Default is start time of the job. User can provide a valid date or offset similar to =recent= option. Valid date formats are "yyyy-MM-dd'T'HH:mm'Z'" and "yyyy-MM-dd HH:mm:ss,SSS". When an offset is specified, it is calculated relative to the start time of the job. For eg: start=2h will fetch logs starting 2 hours after the job was started.
+ * end: End time for scanning logs. Default is end time of the job or current system time if the job is still running. User can provide a valid date or offset similar to =start= option. When an offset is specified, it is calculated relative to start time i.e job start time . For eg: end=2h will fetch logs from start time and start time plus 2 hours.
+ * loglevel : Multiple log levels separated by "|" can be specified. Supported log levels are ALL, DEBUG, ERROR, INFO, TRACE, WARN, FATAL.
+ * text: String to search in logs.
+ * limit : Limit number of line to be searched. Log search will end when when n lines(excluding stack-trace) have been matched.
+ * debug : Prints debug information on the log files, time ranges and patterns being searched for. Can be used to debug if expected logs are not shown with the filter options provided.
+
+Examples.
+Searching log with log level ERROR or WARN will only give log with Error and Warning (with stack-trace) only.
+This will be useful if job has failed and user want to find error logs with exception.
+<verbatim>
+
+$ ./oozie job -log 0000006-140319184715726-oozie-puru-W -logfilter loglevel=WARN\;limit=3 -oozie http://localhost:11000/oozie/
+2014-03-20 10:01:52,977 WARN ActionStartXCommand:542 - SERVER[ ] USER[-] GROUP[-] TOKEN[] APP[map-reduce-wf] JOB[0000006-140319184715726-oozie-puru-W] ACTION[0000006-140319184715726-oozie-puru-W@:start:] [***0000006-140319184715726-oozie-puru-W@:start:***]Action status=DONE
+2014-03-20 10:01:52,977 WARN ActionStartXCommand:542 - SERVER[ ] USER[-] GROUP[-] TOKEN[] APP[map-reduce-wf] JOB[0000006-140319184715726-oozie-puru-W] ACTION[0000006-140319184715726-oozie-puru-W@:start:] [***0000006-140319184715726-oozie-puru-W@:start:***]Action updated in DB!
+2014-03-20 10:01:53,189 WARN ActionStartXCommand:542 - SERVER[ ] USER[-] GROUP[-] TOKEN[] APP[map-reduce-wf] JOB[0000006-140319184715726-oozie-puru-W] ACTION[0000006-140319184715726-oozie-puru-W@mr-node-1] Error starting action [mr-node-1]. ErrorType [TRANSIENT], ErrorCode [JA009], Message [JA009: java.io.IOException: java.io.IOException: Queue "aaadefault" does not exist
+ at org.apache.hadoop.mapred.JobTracker.submitJob(JobTracker.java:3615)
+ at org.apache.hadoop.mapred.JobTracker.submitJob(JobTracker.java:3561)
+ at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
+ at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
+ at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
+ at java.lang.reflect.Method.invoke(Method.java:597)
+ at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:587)
+ at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1432)
+ at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1428)
+ at java.security.AccessController.doPrivileged(Native Method)
+ at javax.security.auth.Subject.doAs(Subject.java:394)
+ at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
+ at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1426)
+Caused by: java.io.IOException: Queue "aaadefault" does not exist
+ at org.apache.hadoop.mapred.JobInProgress.<init>(JobInProgress.java:433)
+ at org.apache.hadoop.mapred.JobTracker.submitJob(JobTracker.java:3613)
+ ... 12 more
+$
+</verbatim>
+
+Search with specific text and recent option.
+<verbatim>
+$ ./oozie job -log 0000003-140319184715726-oozie-puru-C -logfilter text=Missing\;limit=4\;recent=1h -oozie http://localhost:11000/oozie/
+2014-03-20 09:59:50,329 INFO CoordActionInputCheckXCommand:539 - SERVER[ ] USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0000003-140319184715726-oozie-puru-C] ACTION[0000003-140319184715726-oozie-puru-C@1] [0000003-140319184715726-oozie-puru-C@1]::CoordActionInputCheck:: Missing deps:hdfs://localhost:9000/user/purushah/examples/input-data/rawLogs/
+2014-03-20 09:59:50,330 INFO CoordActionInputCheckXCommand:539 - SERVER[ ] USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0000003-140319184715726-oozie-puru-C] ACTION[0000003-140319184715726-oozie-puru-C@1] [0000003-140319184715726-oozie-puru-C@1]::ActionInputCheck:: In checkListOfPaths: hdfs://localhost:9000/user/purushah/examples/input-data/rawLogs/ is Missing.
+2014-03-20 10:02:19,087 INFO CoordActionInputCheckXCommand:539 - SERVER[ ] USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0000003-140319184715726-oozie-puru-C] ACTION[0000003-140319184715726-oozie-puru-C@2] [0000003-140319184715726-oozie-puru-C@2]::CoordActionInputCheck:: Missing deps:hdfs://localhost:9000/user/purushah/examples/input-data/rawLogs/
+2014-03-20 10:02:19,088 INFO CoordActionInputCheckXCommand:539 - SERVER[ ] USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0000003-140319184715726-oozie-puru-C] ACTION[0000003-140319184715726-oozie-puru-C@2] [0000003-140319184715726-oozie-puru-C@2]::ActionInputCheck:: In checkListOfPaths: hdfs://localhost:9000/user/purushah/examples/input-data/rawLogs/ is Missing.
+$
+</verbatim>
+
+Search example with specific date range.
+<verbatim>
+$ ./oozie job -log 0000003-140319184715726-oozie-puru-C -logfilter "start=2014-03-20 10:00:57,063;end=2014-03-20 10:10:57,063" -oozie http://localhost:11000/oozie/
+2014-03-20 10:00:57,063 INFO CoordActionUpdateXCommand:539 - SERVER[ ] USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0000003-140319184715726-oozie-puru-C] ACTION[0000003-140319184715726-oozie-puru-C@1] Updating Coordintaor action id :0000003-140319184715726-oozie-puru-C@1 status to KILLED, pending = 0
+2014-03-20 10:02:18,967 INFO CoordMaterializeTransitionXCommand:539 - SERVER[ ] USER[-] GROUP[-] TOKEN[] APP[aggregator-coord] JOB[0000003-140319184715726-oozie-puru-C] ACTION[-] materialize actions for tz=Coordinated Universal Time,
+ start=Thu Dec 31 18:00:00 PST 2009, end=Thu Dec 31 19:00:00 PST 2009,
+ timeUnit 12,
+ frequency :60:MINUTE,
+ lastActionNumber 1
+2014-03-20 10:02:18,971 WARN CoordELFunctions:542 - SERVER[ ] USER[-] GROUP[-] TOKEN[] APP[aggregator-coord] JOB[0000003-140319184715726-oozie-puru-C] ACTION[-] If the initial instance of the dataset is later than the current-instance specified, such as coord:current(-200) in this case, an empty string is returned. This means that no data is available at the current-instance specified by the user and the user could try modifying his initial-instance to an earlier time.
+2014-03-20 10:02:18,975 INFO CoordMaterializeTransitionXCommand:539 - SERVER[ ] USER[-] GROUP[-] TOKEN[] APP[aggregator-coord] JOB[0000003-140319184715726-oozie-puru-C] ACTION[-] [0000003-140319184715726-oozie-puru-C]: all actions have been materialized, set pending to true
+2014-03-20 10:02:18,975 INFO CoordMaterializeTransitionXCommand:539 - SERVER[ ] USER[-] GROUP[-] TOKEN[] APP[aggregator-coord] JOB[0000003-140319184715726-oozie-puru-C] ACTION[-] Coord Job status updated to = RUNNING
+</verbatim>
+
---+++ Dryrun of Coordinator Job
* This feature is only supported in Oozie 2.0 or later.
http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/docs/src/site/twiki/DG_WorkflowReRun.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/DG_WorkflowReRun.twiki b/docs/src/site/twiki/DG_WorkflowReRun.twiki
index e6cca80..88d982b 100644
--- a/docs/src/site/twiki/DG_WorkflowReRun.twiki
+++ b/docs/src/site/twiki/DG_WorkflowReRun.twiki
@@ -16,6 +16,11 @@
* If secured hadoop version is used, the following two properties needs to be specified as well
* mapreduce.jobtracker.kerberos.principal
* dfs.namenode.kerberos.principal.
+ * Configurations can be passed as -D param.
+<verbatim>
+$ oozie job -oozie http://localhost:11000/oozie -rerun 14-20090525161321-oozie-joe -Doozie.wf.rerun.skip.nodes=<>
+</verbatim>
+
---++ Pre-Conditions
* Workflow with id wfId should exist.
http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/docs/src/site/twiki/WebServicesAPI.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/WebServicesAPI.twiki b/docs/src/site/twiki/WebServicesAPI.twiki
index c93fc7f..6730255 100644
--- a/docs/src/site/twiki/WebServicesAPI.twiki
+++ b/docs/src/site/twiki/WebServicesAPI.twiki
@@ -1344,6 +1344,17 @@ Content-Type: text/plain;charset=UTF-8
...
</verbatim>
+---++++ Filtering the server logs with logfilter options
+User can provide multiple option to filter logs using -logfilter opt1=val1;opt2=val1;opt3=val1. This can be used to fetch only just logs of interest faster as fetching Oozie server logs is slow due to the overhead of pattern matching.
+
+<verbatim>
+GET /oozie/v1/job/0000003-140319184715726-oozie-puru-C?show=log&logfilter=limit=3;loglevel=WARN
+</verbatim>
+
+
+Refer to the [[DG_CommandLineTool#Filtering_the_server_logs_with_logfilter_options][Filtering the server logs with logfilter options]] for more details.
+
+
---++++ Job graph
An =HTTP GET= request returns the image of the workflow DAG (rendered as a PNG image).
http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index d380863..eff6224 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 4.1.0 release (trunk - unreleased)
+OOZIE-1737 Oozie log streaming is slow (puru via rohini)
OOZIE-1794 java-opts and java-opt in the Java action don't always work properly in YARN (rkanter)
OOZIE-1799 Document hcatalog integration steps for Oozie in a secure cluster (venkatnrangan via bzhang)
OOZIE-1585 Upgrade oozie to pig 0.12.1 (bzhang)
http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/webapp/src/main/webapp/oozie-console.js
----------------------------------------------------------------------
diff --git a/webapp/src/main/webapp/oozie-console.js b/webapp/src/main/webapp/oozie-console.js
index 5f4f847..764d888 100644
--- a/webapp/src/main/webapp/oozie-console.js
+++ b/webapp/src/main/webapp/oozie-console.js
@@ -43,11 +43,15 @@ Ext.override(Ext.Component, {
stateful : false
});
-function getLogs(url, textArea, shouldParseResponse, errorMsg) {
+function getLogs(url, searchFilter, logStatus, textArea, shouldParseResponse, errorMsg) {
textArea.getEl().dom.value = '';
+ url = url + "&logfilter=" +searchFilter;
+ logStatus.getEl().dom.innerText = "Log status : Loading... done";
if (!errorMsg) {
errorMsg = "Fatal Error. Can't load logs.";
+ logStatus.getEl().dom.innerText = "Log status : Errored out";
+
}
if (!window.XMLHttpRequest) {
Ext.Ajax.request({
@@ -56,13 +60,17 @@ function getLogs(url, textArea, shouldParseResponse, errorMsg) {
success : function(response, request) {
if (shouldParseResponse) {
processAndDisplayLog(response.responseText, textArea);
+ logStatus.getEl().dom.innerText = "Log status : Loading... done";
+
} else {
textArea.getEl().dom.value = response.responseText;
+ logStatus.getEl().dom.innerText = "Log status : Loading... done";
}
},
failure : function() {
textArea.getEl().dom.value = errorMsg;
+ logStatus.getEl().dom.innerText = "Log status : Errored out";
}
});
@@ -85,6 +93,11 @@ function getLogs(url, textArea, shouldParseResponse, errorMsg) {
if (xhr.status != 200 && xhr.status != 0) {
var errorText = xhr.getResponseHeader('oozie-error-message');
textArea.getEl().dom.value = "Error :\n" + (errorText ? errorText : xhr.responseText);
+ logStatus.getEl().dom.innerText = "Log status : Errored out";
+
+ }
+ if (xhr.readyState =4 && xhr.status == 200) {
+ logStatus.getEl().dom.innerText = "Log status : Loading... done";
}
} catch (e) {
}
@@ -349,7 +362,9 @@ function jobDetailsPopup(response, request) {
width: 1035,
height: 400,
autoScroll: true,
- emptyText: "Loading..."
+ emptyText: "To optimize log searching, you can provide search filter options as opt1=val1;opt2=val1;opt3=val1.\n" +
+ "Available options are recent, start, end, loglevel, text, limit and debug.\n" +
+ "For more detail refer documentation (/oozie/docs/DG_CommandLineTool.html#Filtering_the_server_logs_with_logfilter_options)"
});
function fetchDefinition(workflowId) {
Ext.Ajax.request({
@@ -361,8 +376,8 @@ function jobDetailsPopup(response, request) {
});
}
- function fetchLogs(workflowId) {
- getLogs(getOozieBase() + 'job/' + workflowId + "?show=log", jobLogArea, false, null);
+ function fetchLogs(workflowId, logStatus) {
+ getLogs(getOozieBase() + 'job/' + workflowId + "?show=log", searchFilterBox.getValue(), logStatus, jobLogArea, false, null);
}
var jobDetails = eval("(" + response.responseText + ")");
@@ -747,6 +762,31 @@ function jobDetailsPopup(response, request) {
autoScroll: true,
style : { overflow: 'auto', overflowX: 'hidden' }
});
+
+ var searchFilter = new Ext.form.Label({
+ text : 'Enter Search Filter'
+ });
+
+ var searchFilterBox = new Ext.form.TextField({
+ fieldLabel: 'searchFilterBox',
+ name: 'searchFilterBox',
+ width: 350,
+ value: ''
+ });
+
+ var logStatus = new Ext.form.Label({
+ text : 'Log Status : '
+ });
+
+
+ var getLogButton = new Ext.Button({
+ text: 'Get Logs',
+ ctCls: 'x-btn-over',
+ handler: function() {
+ fetchLogs(workflowId, logStatus);
+ }
+ });
+
var isLoadedDAG = false;
var jobDetailsTab = new Ext.TabPanel({
@@ -774,13 +814,7 @@ function jobDetailsPopup(response, request) {
}, {
title: 'Job Log',
items: jobLogArea,
- tbar: [ {
- text: " ",
- icon: 'ext-2.2/resources/images/default/grid/refresh.gif',
- handler: function() {
- fetchLogs(workflowId);
- }
- }]
+ tbar: [ searchFilter, searchFilterBox, getLogButton, {xtype: 'tbfill'}, logStatus]
}, {
title: 'Job DAG',
items: imageContainer,
@@ -799,9 +833,6 @@ function jobDetailsPopup(response, request) {
jobs_grid.setVisible(true);
return;
}
- if (selectedTab.title == 'Job Log') {
- fetchLogs(workflowId);
- }
else if (selectedTab.title == 'Job Definition') {
fetchDefinition(workflowId);
} else if(selectedTab.title == 'Job DAG') {
@@ -853,13 +884,17 @@ function coordJobDetailsPopup(response, request) {
height: 400,
autoScroll: true,
emptyText: "Enter the list of actions in the format similar to 1,3-4,7-40 to get logs for specific coordinator actions. " +
- "To get the log for the coordinator job, leave the actions field empty."
+ "To get the log for the coordinator job, leave the actions field empty.\n\n" +
+ "To optimize log searching, you can provide search filter options as opt1=val1;opt2=val1;opt3=val1.\n" +
+ "Available options are recent, start, end, loglevel, text, limit and debug.\n" +
+ "For more detail refer documentation (/oozie/docs/DG_CommandLineTool.html#Filtering_the_server_logs_with_logfilter_options)"
});
var getLogButton = new Ext.Button({
- text: 'Get logs',
- handler: function() {
+ text: 'Get Logs',
+ ctCls: 'x-btn-over',
+ handler: function() {
fetchLogs(coordJobId, actionsTextBox.getValue());
- }
+ }
});
var actionsTextBox = new Ext.form.TextField({
fieldLabel: 'ActionsList',
@@ -872,6 +907,22 @@ function coordJobDetailsPopup(response, request) {
text : 'Enter action list : '
});
+ var searchFilter = new Ext.form.Label({
+ text : 'Enter Search Filter'
+ });
+
+ var searchFilterBox = new Ext.form.TextField({
+ fieldLabel: 'searchFilterBox',
+ name: 'searchFilterBox',
+ width: 350,
+ value: ''
+ });
+
+ var logStatus = new Ext.form.Label({
+ text : 'Log Status : '
+ });
+
+
function fetchDefinition(coordJobId) {
Ext.Ajax.request({
url: getOozieBase() + 'job/' + coordJobId + "?show=definition",
@@ -884,10 +935,10 @@ function coordJobDetailsPopup(response, request) {
function fetchLogs(coordJobId, actionsList) {
if (actionsList == '') {
getLogs(getOozieBase() + 'job/' + coordJobId + "?show=log",
- jobLogArea, true, null);
+ searchFilterBox.getValue(), logStatus, jobLogArea, true, null);
} else {
getLogs(getOozieBase() + 'job/' + coordJobId
- + "?show=log&type=action&scope=" + actionsList, jobLogArea,
+ + "?show=log&type=action&scope=" + actionsList, searchFilterBox.getValue(), logStatus, jobLogArea,
true,
'Action List format is wrong. Format should be similar to 1,3-4,7-40');
}
@@ -1279,9 +1330,9 @@ function coordJobDetailsPopup(response, request) {
title: 'Coord Job Info',
items: fs
},{
- title: 'Coord Job Definition',
+ title: 'Coord Job Definition',
items: jobDefinitionArea
- },{
+ },{
title: 'Coord Job Configuration',
items: new Ext.form.TextArea({
fieldLabel: 'Configuration',
@@ -1296,7 +1347,7 @@ function coordJobDetailsPopup(response, request) {
title: 'Coord Job Log',
items: jobLogArea,
tbar: [
- actionsText,actionsTextBox, getLogButton]
+ actionsText,actionsTextBox, searchFilter, searchFilterBox, getLogButton, {xtype: 'tbfill'}, logStatus]
},{
title: 'Coord Action Reruns',
items: rerunsUnit,
@@ -1331,7 +1382,7 @@ function coordJobDetailsPopup(response, request) {
}
function bundleJobDetailsPopup(response, request) {
- var jobDefinitionArea = new Ext.form.TextArea({
+ var jobDefinitionArea = new Ext.form.TextArea({
fieldLabel: 'Definition',
editable: false,
name: 'definition',
@@ -1499,9 +1550,36 @@ function bundleJobDetailsPopup(response, request) {
width: 1010,
height: 400,
autoScroll: true,
- emptyText: "Loading..."
+ emptyText: "To optimize log searching, you can provide search filter options as opt1=val1;opt2=val1;opt3=val1.\n" +
+ "Available options are recent, start, end, loglevel, text, limit and debug.\n" +
+ "For more detail refer documentation (/oozie/docs/DG_CommandLineTool.html#Filtering_the_server_logs_with_logfilter_options)"
+ });
+
+ var searchFilter = new Ext.form.Label({
+ text : 'Enter Search Filter'
+ });
+
+ var searchFilterBox = new Ext.form.TextField({
+ fieldLabel: 'searchFilterBox',
+ name: 'searchFilterBox',
+ width: 350,
+ value: ''
+ });
+
+ var logStatus = new Ext.form.Label({
+ text : 'Log Status : '
+ });
+
+
+ var getLogButton = new Ext.Button({
+ text: 'Get Logs',
+ ctCls: 'x-btn-over',
+ handler: function() {
+ getLogs(getOozieBase() + 'job/' + bundleJobId + "?show=log", searchFilterBox.getValue(), logStatus, jobLogArea, false, null);
+ }
});
+
var jobDetailsTab = new Ext.TabPanel({
activeTab: 0,
autoHeight: true,
@@ -1511,8 +1589,8 @@ function bundleJobDetailsPopup(response, request) {
items: fs
},{
title: 'Bundle Job Definition',
- items: jobDefinitionArea
- },{
+ items: jobDefinitionArea
+ },{
title: 'Bundle Job Configuration',
items: new Ext.form.TextArea({
fieldLabel: 'Configuration',
@@ -1526,14 +1604,8 @@ function bundleJobDetailsPopup(response, request) {
},{
title: 'Bundle Job Log',
items: jobLogArea,
- tbar: [ {
- text: " ",
- icon: 'ext-2.2/resources/images/default/grid/refresh.gif',
- handler: function() {
- getLogs(getOozieBase() + 'job/' + bundleJobId + "?show=log", jobLogArea, false, null);
- }
- }]
- }]
+ tbar: [searchFilter, searchFilterBox, getLogButton, {xtype: 'tbfill'}, logStatus]
+ }]
});
jobDetailsTab.addListener("tabchange", function(panel, selectedTab) {
@@ -1541,9 +1613,6 @@ function bundleJobDetailsPopup(response, request) {
coord_jobs_grid.setVisible(true);
return;
}
- else if (selectedTab.title == 'Bundle Job Log') {
- getLogs(getOozieBase() + 'job/' + bundleJobId + "?show=log", jobLogArea, false, null);
- }
else if (selectedTab.title == 'Bundle Job Definition') {
fetchDefinition(bundleJobId);
}
@@ -1662,7 +1731,7 @@ function showConfigurationInWindow(dataObject, windowTitle) {
* create the coord jobs store
*/
var coord_jobs_store = new Ext.data.JsonStore({
- baseParams: {
+ baseParams: {
jobtype: "coord",
filter: "status=RUNNING;status=RUNNINGWITHERROR",
timezone: getTimeZone()
@@ -1704,7 +1773,7 @@ jobs_store.proxy.conn.method = "GET";
*/
var bundle_jobs_store = new Ext.data.JsonStore({
- baseParams: {
+ baseParams: {
jobtype: "bundle",
filter: "status=RUNNING;status=RUNNINGWITHERROR",
timezone: getTimeZone()
@@ -2319,7 +2388,7 @@ function initConsole() {
width: 80,
sortable: true,
dataIndex: 'status'
- }, {
+ }, {
header: "User",
width: 80,
sortable: true,
[2/2] git commit: OOZIE-1737 Oozie log streaming is slow (puru via
rohini)
Posted by ro...@apache.org.
OOZIE-1737 Oozie log streaming is slow (puru via rohini)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/f9176a01
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/f9176a01
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/f9176a01
Branch: refs/heads/master
Commit: f9176a01be214ecf6409e239f76806ed01be179d
Parents: 2ef1711
Author: Rohini Palaniswamy <ro...@yahoo-inc.com>
Authored: Thu Apr 24 14:42:10 2014 -0700
Committer: Rohini Palaniswamy <ro...@yahoo-inc.com>
Committed: Thu Apr 24 14:42:10 2014 -0700
----------------------------------------------------------------------
.../java/org/apache/oozie/cli/OozieCLI.java | 14 +-
.../org/apache/oozie/client/OozieClient.java | 27 +-
.../apache/oozie/client/rest/RestConstants.java | 2 +
.../java/org/apache/oozie/BundleEngine.java | 26 +-
.../java/org/apache/oozie/BundleJobBean.java | 18 +
.../org/apache/oozie/CoordinatorEngine.java | 44 +-
.../main/java/org/apache/oozie/DagEngine.java | 28 +-
.../oozie/service/DagXLogInfoService.java | 10 +-
.../org/apache/oozie/service/XLogService.java | 10 +-
.../oozie/service/XLogStreamingService.java | 5 +-
.../oozie/service/ZKXLogStreamingService.java | 10 +-
.../org/apache/oozie/servlet/V1JobServlet.java | 23 +-
.../util/SimpleTimestampedMessageParser.java | 2 +-
.../oozie/util/TimestampedMessageParser.java | 25 +-
.../java/org/apache/oozie/util/XLogFilter.java | 336 ++++++++++++++++
.../org/apache/oozie/util/XLogStreamer.java | 180 ++-------
.../apache/oozie/util/XLogUserFilterParam.java | 300 ++++++++++++++
core/src/main/resources/oozie-default.xml | 21 +
.../oozie/TestCoordinatorEngineStreamLog.java | 9 +-
.../oozie/service/TestXLogStreamingService.java | 26 +-
.../service/TestZKXLogStreamingService.java | 38 +-
.../org/apache/oozie/util/TestLogStreamer.java | 77 ++--
.../TestSimplifiedTimestampedMessageParser.java | 32 +-
.../util/TestTimestampedMessageParser.java | 62 +--
.../org/apache/oozie/util/TestXLogFilter.java | 27 +-
.../oozie/util/TestXLogUserFilterParam.java | 403 +++++++++++++++++++
.../src/test/resources/userLogFilterTestlog.log | 62 +++
docs/src/site/twiki/DG_CommandLineTool.twiki | 67 +++
docs/src/site/twiki/DG_WorkflowReRun.twiki | 5 +
docs/src/site/twiki/WebServicesAPI.twiki | 11 +
release-log.txt | 1 +
webapp/src/main/webapp/oozie-console.js | 151 +++++--
32 files changed, 1671 insertions(+), 381 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/oozie/cli/OozieCLI.java b/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
index f26e33b..5260ab7 100644
--- a/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
+++ b/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
@@ -302,6 +302,10 @@ public class OozieCLI {
Option timezone = new Option(TIME_ZONE_OPTION, true,
"use time zone with the specified ID (default GMT).\nSee 'oozie info -timezones' for a list");
Option log = new Option(LOG_OPTION, true, "job log");
+ Option logFilter = new Option(
+ RestConstants.LOG_FILTER_OPTION, true,
+ "job log search parameter. Can be specified as -logfilter opt1=val1;opt2=val1;opt3=val1. "
+ + "Supported options are recent, start, end, loglevel, text, limit and debug");
Option definition = new Option(DEFINITION_OPTION, true, "job definition");
Option config_content = new Option(CONFIG_CONTENT_OPTION, true, "job configuration");
Option verbose = new Option(VERBOSE_OPTION, false, "verbose mode");
@@ -358,6 +362,7 @@ public class OozieCLI {
jobOptions.addOption(rerun_nocleanup);
jobOptions.addOption(getAllWorkflows);
jobOptions.addOptionGroup(actions);
+ jobOptions.addOption(logFilter);
addAuthOptions(jobOptions);
jobOptions.addOption(showdiff);
@@ -1056,6 +1061,10 @@ public class OozieCLI {
}
else if (options.contains(LOG_OPTION)) {
PrintStream ps = System.out;
+ String logFilter = null;
+ if (options.contains(RestConstants.LOG_FILTER_OPTION)) {
+ logFilter = commandLine.getOptionValue(RestConstants.LOG_FILTER_OPTION);
+ }
if (commandLine.getOptionValue(LOG_OPTION).contains("-C")) {
String logRetrievalScope = null;
String logRetrievalType = null;
@@ -1068,7 +1077,8 @@ public class OozieCLI {
logRetrievalScope = commandLine.getOptionValue(DATE_OPTION);
}
try {
- wc.getJobLog(commandLine.getOptionValue(LOG_OPTION), logRetrievalType, logRetrievalScope, ps);
+ wc.getJobLog(commandLine.getOptionValue(LOG_OPTION), logRetrievalType, logRetrievalScope,
+ logFilter, ps);
}
finally {
ps.close();
@@ -1076,7 +1086,7 @@ public class OozieCLI {
}
else {
if (!options.contains(ACTION_OPTION) && !options.contains(DATE_OPTION)) {
- wc.getJobLog(commandLine.getOptionValue(LOG_OPTION), null, null, ps);
+ wc.getJobLog(commandLine.getOptionValue(LOG_OPTION), null, null, logFilter, ps);
}
else {
throw new OozieCLIException("Invalid options provided for log retrieval. " + ACTION_OPTION
http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/client/src/main/java/org/apache/oozie/client/OozieClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/oozie/client/OozieClient.java b/client/src/main/java/org/apache/oozie/client/OozieClient.java
index e5afdc7..211b5ad 100644
--- a/client/src/main/java/org/apache/oozie/client/OozieClient.java
+++ b/client/src/main/java/org/apache/oozie/client/OozieClient.java
@@ -910,21 +910,35 @@ public class OozieClient {
* @param jobId job Id.
* @param logRetrievalType Based on which filter criteria the log is retrieved
* @param logRetrievalScope Value for the retrieval type
+ * @param logFilter log filter
+ * @param ps Printstream of command line interface
+ * @throws OozieClientException thrown if the job info could not be retrieved.
+ */
+ public void getJobLog(String jobId, String logRetrievalType, String logRetrievalScope, String logFilter,
+ PrintStream ps) throws OozieClientException {
+ new JobLog(jobId, logRetrievalType, logRetrievalScope, logFilter, ps).call();
+ }
+
+ /**
+ * Get the log of a job.
+ *
+ * @param jobId job Id.
+ * @param logRetrievalType Based on which filter criteria the log is retrieved
+ * @param logRetrievalScope Value for the retrieval type
* @param ps Printstream of command line interface
* @throws OozieClientException thrown if the job info could not be retrieved.
*/
public void getJobLog(String jobId, String logRetrievalType, String logRetrievalScope, PrintStream ps)
throws OozieClientException {
- new JobLog(jobId, logRetrievalType, logRetrievalScope, ps).call();
+ getJobLog(jobId, logRetrievalType, logRetrievalScope, null, ps);
}
private class JobLog extends JobMetadata {
JobLog(String jobId) {
super(jobId, RestConstants.JOB_SHOW_LOG);
}
-
- JobLog(String jobId, String logRetrievalType, String logRetrievalScope, PrintStream ps) {
- super(jobId, logRetrievalType, logRetrievalScope, RestConstants.JOB_SHOW_LOG, ps);
+ JobLog(String jobId, String logRetrievalType, String logRetrievalScope, String logFilter, PrintStream ps) {
+ super(jobId, logRetrievalType, logRetrievalScope, RestConstants.JOB_SHOW_LOG, logFilter, ps);
}
}
@@ -984,10 +998,11 @@ public class OozieClient {
metaType));
}
- JobMetadata(String jobId, String logRetrievalType, String logRetrievalScope, String metaType, PrintStream ps) {
+ JobMetadata(String jobId, String logRetrievalType, String logRetrievalScope, String metaType, String logFilter,
+ PrintStream ps) {
super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
metaType, RestConstants.JOB_LOG_TYPE_PARAM, logRetrievalType, RestConstants.JOB_LOG_SCOPE_PARAM,
- logRetrievalScope));
+ logRetrievalScope, RestConstants.LOG_FILTER_OPTION, logFilter));
printStream = ps;
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java b/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
index b0dd6cb..808f9b2 100644
--- a/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
+++ b/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
@@ -174,4 +174,6 @@ public interface RestConstants {
public static final String ALL_WORKFLOWS_FOR_COORD_ACTION = "allruns";
+ public static final String LOG_FILTER_OPTION = "logfilter";
+
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/main/java/org/apache/oozie/BundleEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/BundleEngine.java b/core/src/main/java/org/apache/oozie/BundleEngine.java
index 5bf1538..67f4aba 100644
--- a/core/src/main/java/org/apache/oozie/BundleEngine.java
+++ b/core/src/main/java/org/apache/oozie/BundleEngine.java
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.Set;
import java.util.StringTokenizer;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
@@ -51,9 +52,10 @@ import org.apache.oozie.service.DagXLogInfoService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.XLogStreamingService;
import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.XLogFilter;
+import org.apache.oozie.util.XLogUserFilterParam;
import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.XLog;
-import org.apache.oozie.util.XLogStreamer;
import com.google.common.annotations.VisibleForTesting;
@@ -241,20 +243,26 @@ public class BundleEngine extends BaseEngine {
* @see org.apache.oozie.BaseEngine#streamLog(java.lang.String, java.io.Writer)
*/
@Override
- public void streamLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException, BundleEngineException {
- XLogStreamer.Filter filter = new XLogStreamer.Filter();
- filter.setParameter(DagXLogInfoService.JOB, jobId);
+ public void streamLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
+ BundleEngineException {
BundleJobBean job;
try {
+ XLogFilter filter = new XLogFilter(new XLogUserFilterParam(params));
+ filter.setParameter(DagXLogInfoService.JOB, jobId);
job = new BundleJobXCommand(jobId).call();
+ Date lastTime = null;
+ if (job.isTerminalStatus()) {
+ lastTime = job.getLastModifiedTime();
+ }
+ if (lastTime == null) {
+ lastTime = new Date();
+ }
+ Services.get().get(XLogStreamingService.class).streamLog(filter, job.getCreatedTime(), lastTime, writer, params);
}
- catch (CommandException ex) {
- throw new BundleEngineException(ex);
+ catch (Exception ex) {
+ throw new IOException(ex);
}
-
- Services.get().get(XLogStreamingService.class)
- .streamLog(filter, job.getCreatedTime(), new Date(), writer, params);
}
/* (non-Javadoc)
http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/main/java/org/apache/oozie/BundleJobBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/BundleJobBean.java b/core/src/main/java/org/apache/oozie/BundleJobBean.java
index 4c37e57..0f1670a 100644
--- a/core/src/main/java/org/apache/oozie/BundleJobBean.java
+++ b/core/src/main/java/org/apache/oozie/BundleJobBean.java
@@ -798,4 +798,22 @@ public class BundleJobBean implements Writable, BundleJob, JsonBean {
return DateUtils.toDate(startTimestamp);
}
+ /**
+ * @return true if in terminal status
+ */
+ public boolean isTerminalStatus() {
+ boolean isTerminal = false;
+ switch (getStatus()) {
+ case SUCCEEDED:
+ case FAILED:
+ case KILLED:
+ case DONEWITHERROR:
+ isTerminal = true;
+ break;
+ default:
+ isTerminal = false;
+ break;
+ }
+ return isTerminal;
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
index 81d98bf..e627a95 100644
--- a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
+++ b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
@@ -32,6 +32,7 @@ import java.util.Map;
import java.util.Set;
import java.util.StringTokenizer;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
@@ -57,9 +58,10 @@ import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
import org.apache.oozie.service.DagXLogInfoService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.XLogStreamingService;
+import org.apache.oozie.util.XLogFilter;
+import org.apache.oozie.util.XLogUserFilterParam;
import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.XLog;
-import org.apache.oozie.util.XLogStreamer;
import com.google.common.annotations.VisibleForTesting;
@@ -271,12 +273,26 @@ public class CoordinatorEngine extends BaseEngine {
* java.io.Writer)
*/
@Override
- public void streamLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException, BaseEngineException {
- XLogStreamer.Filter filter = new XLogStreamer.Filter();
- filter.setParameter(DagXLogInfoService.JOB, jobId);
+ public void streamLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
+ BaseEngineException {
- CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
- Services.get().get(XLogStreamingService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer, params);
+ try {
+ XLogFilter filter = new XLogFilter(new XLogUserFilterParam(params));
+ filter.setParameter(DagXLogInfoService.JOB, jobId);
+ Date lastTime = null;
+ CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
+ if (job.isTerminalStatus()) {
+ lastTime = job.getLastModifiedTime();
+ }
+ if (lastTime == null) {
+ lastTime = new Date();
+ }
+ Services.get().get(XLogStreamingService.class)
+ .streamLog(filter, job.getCreatedTime(), lastTime, writer, params);
+ }
+ catch (Exception e) {
+ throw new IOException(e);
+ }
}
/**
@@ -296,7 +312,8 @@ public class CoordinatorEngine extends BaseEngine {
Date startTime = null;
Date endTime = null;
- XLogStreamer.Filter filter = new XLogStreamer.Filter();
+ XLogFilter filter = new XLogFilter(new XLogUserFilterParam(params));
+
filter.setParameter(DagXLogInfoService.JOB, jobId);
if (logRetrievalScope != null && logRetrievalType != null) {
// if coordinator action logs are to be retrieved based on action id range
@@ -371,6 +388,7 @@ public class CoordinatorEngine extends BaseEngine {
startTime = actionBean.getCreatedTime();
endTime = actionBean.getStatus().equals(CoordinatorAction.Status.RUNNING) ? new Date() : actionBean
.getLastModifiedTime();
+ filter.setActionList(true);
}
else if (actionSet != null && actionSet.size() > 0) {
List<String> tempList = new ArrayList<String>(actionSet);
@@ -383,6 +401,7 @@ public class CoordinatorEngine extends BaseEngine {
startTime = getCoordAction(tempList.get(0)).getCreatedTime();
endTime = CoordActionsInDateRange.getCoordActionsLastModifiedDate(jobId, tempList.get(0),
tempList.get(tempList.size() - 1));
+ filter.setActionList(true);
}
}
// if coordinator action logs are to be retrieved based on date range
@@ -414,12 +433,12 @@ public class CoordinatorEngine extends BaseEngine {
orSeparatedActions.append(")");
}
filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString());
-
if (coordActionIdList != null && coordActionIdList.size() == 1) {
CoordinatorActionBean actionBean = getCoordAction(coordActionIdList.get(0));
startTime = actionBean.getCreatedTime();
endTime = actionBean.getStatus().equals(CoordinatorAction.Status.RUNNING) ? new Date() : actionBean
.getLastModifiedTime();
+ filter.setActionList(true);
}
else if (coordActionIdList != null && coordActionIdList.size() > 0) {
Collections.sort(coordActionIdList, new Comparator<String>() {
@@ -431,6 +450,7 @@ public class CoordinatorEngine extends BaseEngine {
startTime = getCoordAction(coordActionIdList.get(0)).getCreatedTime();
endTime = CoordActionsInDateRange.getCoordActionsLastModifiedDate(jobId, coordActionIdList.get(0),
coordActionIdList.get(coordActionIdList.size() - 1));
+ filter.setActionList(true);
}
}
}
@@ -440,10 +460,14 @@ public class CoordinatorEngine extends BaseEngine {
startTime = job.getCreatedTime();
}
if (endTime == null) {
- endTime = new Date();
+ if (job.isTerminalStatus()) {
+ endTime = job.getLastModifiedTime();
+ }
+ if (endTime == null) {
+ endTime = new Date();
+ }
}
}
- //job.getActions()
Services.get().get(XLogStreamingService.class).streamLog(filter, startTime, endTime, writer, params);
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/main/java/org/apache/oozie/DagEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/DagEngine.java b/core/src/main/java/org/apache/oozie/DagEngine.java
index cad5ddd..f6e5b4f 100644
--- a/core/src/main/java/org/apache/oozie/DagEngine.java
+++ b/core/src/main/java/org/apache/oozie/DagEngine.java
@@ -17,9 +17,9 @@
*/
package org.apache.oozie;
-import org.apache.oozie.util.XLogStreamer;
import org.apache.oozie.service.XLogService;
import org.apache.oozie.service.DagXLogInfoService;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.WorkflowJob;
@@ -47,6 +47,8 @@ import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.CallableQueueService;
+import org.apache.oozie.util.XLogFilter;
+import org.apache.oozie.util.XLogUserFilterParam;
import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.XCallable;
import org.apache.oozie.util.XConfiguration;
@@ -400,15 +402,21 @@ public class DagEngine extends BaseEngine {
* @throws DagEngineException thrown if there is error in getting the Workflow Information for jobId.
*/
@Override
- public void streamLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException, DagEngineException {
- XLogStreamer.Filter filter = new XLogStreamer.Filter();
- filter.setParameter(DagXLogInfoService.JOB, jobId);
- WorkflowJob job = getJob(jobId);
- Date lastTime = job.getEndTime();
- if (lastTime == null) {
- lastTime = job.getLastModifiedTime();
- }
- Services.get().get(XLogStreamingService.class).streamLog(filter, job.getCreatedTime(), lastTime, writer, params);
+ public void streamLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
+ DagEngineException {
+ try {
+ XLogFilter filter = new XLogFilter(new XLogUserFilterParam(params));
+ filter.setParameter(DagXLogInfoService.JOB, jobId);
+ WorkflowJob job = getJob(jobId);
+ Date lastTime = job.getEndTime();
+ if (lastTime == null) {
+ lastTime = job.getLastModifiedTime();
+ }
+ Services.get().get(XLogStreamingService.class).streamLog(filter, job.getCreatedTime(), lastTime, writer, params);
+ }
+ catch (Exception e) {
+ throw new IOException(e);
+ }
}
private static final Set<String> FILTER_NAMES = new HashSet<String>();
http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/main/java/org/apache/oozie/service/DagXLogInfoService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/DagXLogInfoService.java b/core/src/main/java/org/apache/oozie/service/DagXLogInfoService.java
index 1b18140..a5fad9d 100644
--- a/core/src/main/java/org/apache/oozie/service/DagXLogInfoService.java
+++ b/core/src/main/java/org/apache/oozie/service/DagXLogInfoService.java
@@ -17,7 +17,7 @@
*/
package org.apache.oozie.service;
-import org.apache.oozie.util.XLogStreamer;
+import org.apache.oozie.util.XLogFilter;
import org.apache.oozie.service.Service;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.XLog;
@@ -58,10 +58,10 @@ public class DagXLogInfoService implements Service {
XLog.Info.defineParameter(JOB);
XLog.Info.defineParameter(ACTION);
- XLogStreamer.Filter.defineParameter(TOKEN);
- XLogStreamer.Filter.defineParameter(APP);
- XLogStreamer.Filter.defineParameter(JOB);
- XLogStreamer.Filter.defineParameter(ACTION);
+ XLogFilter.defineParameter(TOKEN);
+ XLogFilter.defineParameter(APP);
+ XLogFilter.defineParameter(JOB);
+ XLogFilter.defineParameter(ACTION);
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/main/java/org/apache/oozie/service/XLogService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/XLogService.java b/core/src/main/java/org/apache/oozie/service/XLogService.java
index f9383d0..403a089 100644
--- a/core/src/main/java/org/apache/oozie/service/XLogService.java
+++ b/core/src/main/java/org/apache/oozie/service/XLogService.java
@@ -20,10 +20,10 @@ package org.apache.oozie.service;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.LogManager;
import org.apache.log4j.PropertyConfigurator;
+import org.apache.oozie.util.XLogFilter;
import org.apache.oozie.util.Instrumentable;
import org.apache.oozie.util.Instrumentation;
import org.apache.oozie.util.XLog;
-import org.apache.oozie.util.XLogStreamer;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.BuildInfo;
import org.apache.oozie.ErrorCode;
@@ -174,9 +174,9 @@ public class XLogService implements Service, Instrumentable {
XLog.Info.reset();
XLog.Info.defineParameter(USER);
XLog.Info.defineParameter(GROUP);
- XLogStreamer.Filter.reset();
- XLogStreamer.Filter.defineParameter(USER);
- XLogStreamer.Filter.defineParameter(GROUP);
+ XLogFilter.reset();
+ XLogFilter.defineParameter(USER);
+ XLogFilter.defineParameter(GROUP);
// Getting configuration for oozie log via WS
ClassLoader cl = Thread.currentThread().getContextClassLoader();
@@ -282,7 +282,7 @@ public class XLogService implements Service, Instrumentable {
public void destroy() {
LogManager.shutdown();
XLog.Info.reset();
- XLogStreamer.Filter.reset();
+ XLogFilter.reset();
}
/**
http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/main/java/org/apache/oozie/service/XLogStreamingService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/XLogStreamingService.java b/core/src/main/java/org/apache/oozie/service/XLogStreamingService.java
index f77794e..1bf636b 100644
--- a/core/src/main/java/org/apache/oozie/service/XLogStreamingService.java
+++ b/core/src/main/java/org/apache/oozie/service/XLogStreamingService.java
@@ -17,6 +17,7 @@
*/
package org.apache.oozie.service;
+import org.apache.oozie.util.XLogFilter;
import org.apache.oozie.util.Instrumentable;
import org.apache.oozie.util.Instrumentation;
import org.apache.oozie.util.XLogStreamer;
@@ -32,6 +33,7 @@ import java.util.Date;
public class XLogStreamingService implements Service, Instrumentable {
private static final String CONF_PREFIX = Service.CONF_PREFIX + "XLogStreamingService.";
public static final String STREAM_BUFFER_LEN = CONF_PREFIX + "buffer.len";
+
protected int bufferLen;
/**
@@ -78,7 +80,7 @@ public class XLogStreamingService implements Service, Instrumentable {
* @param params additional parameters from the request
* @throws IOException thrown if the log cannot be streamed.
*/
- public void streamLog(XLogStreamer.Filter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params)
+ public void streamLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params)
throws IOException {
XLogService xLogService = Services.get().get(XLogService.class);
if (xLogService.getLogOverWS()) {
@@ -88,7 +90,6 @@ public class XLogStreamingService implements Service, Instrumentable {
else {
writer.write("Log streaming disabled!!");
}
-
}
public int getBufferLen() {
http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java b/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
index 8dc8b4b..7d53378 100644
--- a/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
+++ b/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
@@ -36,6 +36,7 @@ import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.rest.RestConstants;
import org.apache.oozie.util.AuthUrlClient;
+import org.apache.oozie.util.XLogFilter;
import org.apache.oozie.util.SimpleTimestampedMessageParser;
import org.apache.oozie.util.TimestampedMessageParser;
import org.apache.oozie.util.XLog;
@@ -101,7 +102,7 @@ public class ZKXLogStreamingService extends XLogStreamingService implements Serv
* @throws IOException thrown if the log cannot be streamed.
*/
@Override
- public void streamLog(XLogStreamer.Filter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params)
+ public void streamLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params)
throws IOException {
XLogService xLogService = Services.get().get(XLogService.class);
if (xLogService.getLogOverWS()) {
@@ -132,7 +133,7 @@ public class ZKXLogStreamingService extends XLogStreamingService implements Serv
* @param writer
* @throws IOException
*/
- private void collateLogs(XLogStreamer.Filter filter, Date startTime, Date endTime, Writer writer) throws IOException {
+ private void collateLogs(XLogFilter filter, Date startTime, Date endTime, Writer writer) throws IOException {
XLogService xLogService = Services.get().get(XLogService.class);
List<String> badOozies = new ArrayList<String>();
List<ServiceInstance<Map>> oozies = null;
@@ -176,6 +177,11 @@ public class ZKXLogStreamingService extends XLogStreamingService implements Serv
}
}
+ //If log param debug is set, we need to write start date and end date to outputstream.
+ if(filter.isDebugMode()){
+ writer.write(filter.getDebugMessage());
+ }
+
// Add a message about any servers we couldn't contact
if (!badOozies.isEmpty()) {
writer.write("Unable to contact the following Oozie Servers for logs (log information may be incomplete):\n");
http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java b/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java
index 5b65791..e03d7a7 100644
--- a/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java
+++ b/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java
@@ -267,15 +267,20 @@ public class V1JobServlet extends BaseJobServlet {
@Override
protected void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException,
IOException {
- String jobId = getResourceName(request);
- if (jobId.endsWith("-W")) {
- streamWorkflowJobLog(request, response);
- }
- else if (jobId.endsWith("-B")) {
- streamBundleJob(request, response);
+ try {
+ String jobId = getResourceName(request);
+ if (jobId.endsWith("-W")) {
+ streamWorkflowJobLog(request, response);
+ }
+ else if (jobId.endsWith("-B")) {
+ streamBundleJobLog(request, response);
+ }
+ else {
+ streamCoordinatorJobLog(request, response);
+ }
}
- else {
- streamCoordinatorJobLog(request, response);
+ catch (Exception e) {
+ throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0307, e.getMessage());
}
}
@@ -987,7 +992,7 @@ public class V1JobServlet extends BaseJobServlet {
* @param response servlet response
* @throws XServletException
*/
- private void streamBundleJob(HttpServletRequest request, HttpServletResponse response)
+ private void streamBundleJobLog(HttpServletRequest request, HttpServletResponse response)
throws XServletException, IOException {
BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
String jobId = getResourceName(request);
http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/main/java/org/apache/oozie/util/SimpleTimestampedMessageParser.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/SimpleTimestampedMessageParser.java b/core/src/main/java/org/apache/oozie/util/SimpleTimestampedMessageParser.java
index 0a5e8d1..a44e8f6 100644
--- a/core/src/main/java/org/apache/oozie/util/SimpleTimestampedMessageParser.java
+++ b/core/src/main/java/org/apache/oozie/util/SimpleTimestampedMessageParser.java
@@ -27,7 +27,7 @@ import java.io.IOException;
*/
public class SimpleTimestampedMessageParser extends TimestampedMessageParser {
- public SimpleTimestampedMessageParser(BufferedReader reader, XLogStreamer.Filter filter) {
+ public SimpleTimestampedMessageParser(BufferedReader reader, XLogFilter filter) {
super(reader, filter);
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/main/java/org/apache/oozie/util/TimestampedMessageParser.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/TimestampedMessageParser.java b/core/src/main/java/org/apache/oozie/util/TimestampedMessageParser.java
index 8a11780..8b7666d 100644
--- a/core/src/main/java/org/apache/oozie/util/TimestampedMessageParser.java
+++ b/core/src/main/java/org/apache/oozie/util/TimestampedMessageParser.java
@@ -20,8 +20,11 @@ package org.apache.oozie.util;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.Writer;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
+import org.apache.commons.lang.StringUtils;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.XLogStreamingService;
@@ -40,10 +43,11 @@ public class TimestampedMessageParser {
protected BufferedReader reader;
private String nextLine = null;
private String lastTimestamp = null;
- private XLogStreamer.Filter filter;
+ private XLogFilter filter;
private boolean empty = false;
private String lastMessage = null;
private boolean patternMatched = false;
+ public int count = 0;
/**
* Creates a TimestampedMessageParser with the given BufferedReader and filter.
@@ -51,11 +55,11 @@ public class TimestampedMessageParser {
* @param reader The BufferedReader to get the log messages from
* @param filter The filter
*/
- public TimestampedMessageParser(BufferedReader reader, XLogStreamer.Filter filter) {
+ public TimestampedMessageParser(BufferedReader reader, XLogFilter filter) {
this.reader = reader;
this.filter = filter;
if (filter == null) {
- filter = new XLogStreamer.Filter();
+ filter = new XLogFilter();
}
filter.constructPattern();
}
@@ -140,6 +144,21 @@ public class TimestampedMessageParser {
patternMatched = filter.matches(logParts);
}
if (patternMatched) {
+ if (filter.getLogLimit() != -1) {
+ if (logParts != null) {
+ if (count >= filter.getLogLimit()) {
+ return null;
+ }
+ count++;
+ }
+ }
+ if (logParts != null) {
+ if (filter.getEndDate() != null) {
+ //Ignore the milli second part
+ if (logParts.get(0).substring(0, 19).compareTo(filter.getFormattedEndDate()) > 0)
+ return null;
+ }
+ }
return line;
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/main/java/org/apache/oozie/util/XLogFilter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/XLogFilter.java b/core/src/main/java/org/apache/oozie/util/XLogFilter.java
new file mode 100644
index 0000000..0007c76
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/util/XLogFilter.java
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.oozie.service.Services;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Filter that will construct the regular expression that will be used to filter the log statement. And also checks if
+ * the given log message go through the filter. Filters that can be used are logLevel(Multi values separated by "|")
+ * jobId appName actionId token
+ */
+public class XLogFilter {
+
+ private static final int LOG_TIME_BUFFER = 2; // in min
+ public static String MAX_ACTIONLIST_SCAN_DURATION = "oozie.service.XLogStreamingService.actionlist.max.log.scan.duration";
+ public static String MAX_SCAN_DURATION = "oozie.service.XLogStreamingService.max.log.scan.duration";
+ private Map<String, Integer> logLevels;
+ private final Map<String, String> filterParams;
+ private static List<String> parameters = new ArrayList<String>();
+ private boolean noFilter;
+ private Pattern filterPattern;
+ private XLogUserFilterParam userLogFilter;
+ private Date endDate;
+ private Date startDate;
+ private boolean isActionList = false;
+ private String formattedEndDate;
+
+ // TODO Patterns to be read from config file
+ private static final String DEFAULT_REGEX = "[^\\]]*";
+
+ public static final String ALLOW_ALL_REGEX = "(.*)";
+ private static final String TIMESTAMP_REGEX = "(\\d\\d\\d\\d-\\d\\d-\\d\\d \\d\\d:\\d\\d:\\d\\d,\\d\\d\\d)";
+ private static final String WHITE_SPACE_REGEX = "\\s+";
+ private static final String LOG_LEVEL_REGEX = "(\\w+)";
+ private static final String PREFIX_REGEX = TIMESTAMP_REGEX + WHITE_SPACE_REGEX + LOG_LEVEL_REGEX
+ + WHITE_SPACE_REGEX;
+ private static final Pattern SPLITTER_PATTERN = Pattern.compile(PREFIX_REGEX + ALLOW_ALL_REGEX);
+
+ public XLogFilter() {
+ this(new XLogUserFilterParam());
+ }
+
+ public XLogFilter(XLogUserFilterParam userLogFilter) {
+ filterParams = new HashMap<String, String>();
+ for (int i = 0; i < parameters.size(); i++) {
+ filterParams.put(parameters.get(i), DEFAULT_REGEX);
+ }
+ logLevels = null;
+ noFilter = true;
+ filterPattern = null;
+ setUserLogFilter(userLogFilter);
+ }
+
+ public void setLogLevel(String logLevel) {
+ if (logLevel != null && logLevel.trim().length() > 0) {
+ this.logLevels = new HashMap<String, Integer>();
+ String[] levels = logLevel.split("\\|");
+ for (int i = 0; i < levels.length; i++) {
+ String s = levels[i].trim().toUpperCase();
+ try {
+ XLog.Level.valueOf(s);
+ }
+ catch (Exception ex) {
+ continue;
+ }
+ this.logLevels.put(levels[i].toUpperCase(), 1);
+ }
+ }
+ }
+
+ public void setParameter(String filterParam, String value) {
+ if (filterParams.containsKey(filterParam)) {
+ noFilter = false;
+ filterParams.put(filterParam, value);
+ }
+ }
+
+ public static void defineParameter(String filterParam) {
+ parameters.add(filterParam);
+ }
+
+ public boolean isFilterPresent() {
+ if (noFilter && logLevels == null) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Checks if the logLevel and logMessage goes through the logFilter.
+ *
+ * @param logParts
+ * @return
+ */
+ public boolean matches(ArrayList<String> logParts) {
+ String logLevel = logParts.get(1);
+ String logMessage = logParts.get(2);
+ if (this.logLevels == null || this.logLevels.containsKey(logLevel.toUpperCase())) {
+ Matcher logMatcher = filterPattern.matcher(logMessage);
+ return logMatcher.matches();
+ }
+ else {
+ return false;
+ }
+ }
+
+ /**
+ * Splits the log line into timestamp, logLevel and remaining log message. Returns array containing timestamp,
+ * logLevel, and logMessage if the pattern matches i.e A new log statement, else returns null.
+ *
+ * @param logLine
+ * @return Array containing log level and log message
+ */
+ public ArrayList<String> splitLogMessage(String logLine) {
+ Matcher splitter = SPLITTER_PATTERN.matcher(logLine);
+ if (splitter.matches()) {
+ ArrayList<String> logParts = new ArrayList<String>();
+ logParts.add(splitter.group(1));// timestamp
+ logParts.add(splitter.group(2));// log level
+ logParts.add(splitter.group(3));// Log Message
+ return logParts;
+ }
+ else {
+ return null;
+ }
+ }
+
+ /**
+ * Constructs the regular expression according to the filter and assigns it to fileterPattarn. ".*" will be assigned
+ * if no filters are set.
+ */
+ public void constructPattern() {
+ if (noFilter && logLevels == null) {
+ filterPattern = Pattern.compile(ALLOW_ALL_REGEX);
+ return;
+ }
+ StringBuilder sb = new StringBuilder();
+ if (noFilter) {
+ sb.append("(.*)");
+ }
+ else {
+ sb.append("(.* ");
+ for (int i = 0; i < parameters.size(); i++) {
+ sb.append(parameters.get(i) + "\\[");
+ sb.append(filterParams.get(parameters.get(i)) + "\\] ");
+ }
+ sb.append(".*)");
+ }
+ if (!StringUtils.isEmpty(userLogFilter.getSearchText())) {
+ sb.append(userLogFilter.getSearchText() + ".*");
+ }
+ filterPattern = Pattern.compile(sb.toString());
+ }
+
+ public static void reset() {
+ parameters.clear();
+ }
+
+ @VisibleForTesting
+ public final Map<String, String> getFilterParams() {
+ return filterParams;
+ }
+
+ public XLogUserFilterParam getUserLogFilter() {
+ return userLogFilter;
+ }
+
+ public void setUserLogFilter(XLogUserFilterParam userLogFilter) {
+ this.userLogFilter = userLogFilter;
+ setLogLevel(userLogFilter.getLogLevel());
+ }
+
+ public Date getEndDate() {
+ return endDate;
+ }
+
+ public String getFormattedEndDate() {
+ return formattedEndDate;
+ }
+
+
+ public Date getStartDate() {
+ return startDate;
+ }
+
+ public boolean isDebugMode() {
+ return userLogFilter.isDebug();
+ }
+
+ public int getLogLimit() {
+ return userLogFilter.getLimit();
+ }
+
+ public String getDebugMessage() {
+ return "Log start time = " + getStartDate() + ". Log end time = " + getEndDate() + ". User Log Filter = "
+ + getUserLogFilter() + System.getProperty("line.separator");
+ }
+
+ public boolean isActionList() {
+ return isActionList;
+ }
+
+ public void setActionList(boolean isActionList) {
+ this.isActionList = isActionList;
+ }
+
+ private void calculateScanDate(Date jobStartTime, Date jobEndTime) throws IOException {
+
+ if (userLogFilter.getStartDate() != null) {
+ startDate = userLogFilter.getStartDate();
+ }
+ else if (userLogFilter.getStartOffset() != -1) {
+ startDate = adjustOffset(jobStartTime, userLogFilter.getStartOffset());
+ }
+ else {
+ startDate = jobStartTime;
+ }
+
+ if (userLogFilter.getEndDate() != null) {
+ endDate = userLogFilter.getEndDate();
+ }
+ else if (userLogFilter.getEndOffset() != -1) {
+ // If user has specified startdate as absolute then end offset will be on user start date,
+ // else end offset will be calculated on job startdate.
+ if (userLogFilter.getStartDate() != null) {
+ endDate = adjustOffset(startDate, userLogFilter.getEndOffset());
+ }
+ else {
+ endDate = adjustOffset(jobStartTime, userLogFilter.getEndOffset());
+ }
+ }
+ else {
+ endDate = jobEndTime;
+ }
+ // if recent offset is specified then start time = endtime - offset
+ if (getUserLogFilter().getRecent() != -1) {
+ startDate = adjustOffset(endDate, userLogFilter.getRecent() * -1);
+ }
+
+ //add buffer iff dates are not asbsolute
+ if (userLogFilter.getStartDate() == null) {
+ startDate = adjustOffset(startDate, -LOG_TIME_BUFFER);
+ }
+ if (userLogFilter.getEndDate() == null) {
+ endDate = adjustOffset(endDate, LOG_TIME_BUFFER);
+ }
+
+ formattedEndDate = XLogUserFilterParam.dt.get().format(getEndDate());
+ }
+
+ /**
+ * Calculate and validate date range.
+ *
+ * @param jobStartTime the job start time
+ * @param jobEndTime the job end time
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ public void calculateAndValidateDateRange(Date jobStartTime, Date jobEndTime) throws IOException {
+ // for testcase, otherwise jobStartTime and jobEndTime will be always set
+ if (jobStartTime == null || jobEndTime == null) {
+ return;
+ }
+ calculateScanDate(jobStartTime, jobEndTime);
+
+ if (startDate.after(endDate)) {
+ throw new IOException("Start time should be less than end time. startTime = " + startDate + " endtime = "
+ + endDate);
+ }
+ long diffHours = (endDate.getTime() - startDate.getTime()) / (60 * 60 * 1000);
+ if (isActionList) {
+ int actionLogDuration = Services.get().getConf().getInt(MAX_ACTIONLIST_SCAN_DURATION, -1);
+ if (actionLogDuration == -1) {
+ return;
+ }
+ if (diffHours > actionLogDuration) {
+ throw new IOException(
+ "Request log streaming time range with action list is higher than configured. Please reduce the scan "
+ + "time range. Input range (hours) = " + diffHours
+ + " system allowed (hours) with action list = " + actionLogDuration);
+ }
+ }
+ else {
+ int logDuration = Services.get().getConf().getInt(MAX_SCAN_DURATION, -1);
+ if (logDuration == -1) {
+ return;
+ }
+ if (diffHours > logDuration) {
+ throw new IOException(
+ "Request log streaming time range is higher than configured. Please reduce the scan time range. For coord"
+ + " jobs you can provide action list to reduce log scan time range. Input range (hours) = "
+ + diffHours + " system allowed (hours) = " + logDuration);
+ }
+ }
+ }
+
+ /**
+ * Adjust offset, offset will always be in min.
+ *
+ * @param date the date
+ * @param offset the offset
+ * @return the date
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ public Date adjustOffset(Date date, int offset) throws IOException {
+ return org.apache.commons.lang.time.DateUtils.addMinutes(date, offset);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/main/java/org/apache/oozie/util/XLogStreamer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/XLogStreamer.java b/core/src/main/java/org/apache/oozie/util/XLogStreamer.java
index bc6b009..713219b 100644
--- a/core/src/main/java/org/apache/oozie/util/XLogStreamer.java
+++ b/core/src/main/java/org/apache/oozie/util/XLogStreamer.java
@@ -24,14 +24,8 @@ import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-
-import com.google.common.annotations.VisibleForTesting;
-
import java.io.BufferedReader;
/**
@@ -39,154 +33,12 @@ import java.io.BufferedReader;
*/
public class XLogStreamer {
private static XLog LOG = XLog.getLog(XLogStreamer.class);
-
- /**
- * Filter that will construct the regular expression that will be used to filter the log statement. And also checks
- * if the given log message go through the filter. Filters that can be used are logLevel(Multi values separated by
- * "|") jobId appName actionId token
- */
- public static class Filter {
- private Map<String, Integer> logLevels;
- private final Map<String, String> filterParams;
- private static List<String> parameters = new ArrayList<String>();
- private boolean noFilter;
- private Pattern filterPattern;
-
- // TODO Patterns to be read from config file
- private static final String DEFAULT_REGEX = "[^\\]]*";
-
- public static final String ALLOW_ALL_REGEX = "(.*)";
- private static final String TIMESTAMP_REGEX = "(\\d\\d\\d\\d-\\d\\d-\\d\\d \\d\\d:\\d\\d:\\d\\d,\\d\\d\\d)";
- private static final String WHITE_SPACE_REGEX = "\\s+";
- private static final String LOG_LEVEL_REGEX = "(\\w+)";
- private static final String PREFIX_REGEX = TIMESTAMP_REGEX + WHITE_SPACE_REGEX + LOG_LEVEL_REGEX
- + WHITE_SPACE_REGEX;
- private static final Pattern SPLITTER_PATTERN = Pattern.compile(PREFIX_REGEX + ALLOW_ALL_REGEX);
-
- public Filter() {
- filterParams = new HashMap<String, String>();
- for (int i = 0; i < parameters.size(); i++) {
- filterParams.put(parameters.get(i), DEFAULT_REGEX);
- }
- logLevels = null;
- noFilter = true;
- filterPattern = null;
- }
-
- public void setLogLevel(String logLevel) {
- if (logLevel != null && logLevel.trim().length() > 0) {
- this.logLevels = new HashMap<String, Integer>();
- String[] levels = logLevel.split("\\|");
- for (int i = 0; i < levels.length; i++) {
- String s = levels[i].trim().toUpperCase();
- try {
- XLog.Level.valueOf(s);
- }
- catch (Exception ex) {
- continue;
- }
- this.logLevels.put(levels[i].toUpperCase(), 1);
- }
- }
- }
-
- public void setParameter(String filterParam, String value) {
- if (filterParams.containsKey(filterParam)) {
- noFilter = false;
- filterParams.put(filterParam, value);
- }
- }
-
- public static void defineParameter(String filterParam) {
- parameters.add(filterParam);
- }
-
- public boolean isFilterPresent() {
- if (noFilter && logLevels == null) {
- return false;
- }
- return true;
- }
-
- /**
- * Checks if the logLevel and logMessage goes through the logFilter.
- *
- * @param logParts
- * @return
- */
- public boolean matches(ArrayList<String> logParts) {
- String logLevel = logParts.get(1);
- String logMessage = logParts.get(2);
- if (this.logLevels == null || this.logLevels.containsKey(logLevel.toUpperCase())) {
- Matcher logMatcher = filterPattern.matcher(logMessage);
- return logMatcher.matches();
- }
- else {
- return false;
- }
- }
-
- /**
- * Splits the log line into timestamp, logLevel and remaining log message. Returns array containing timestamp, logLevel, and
- * logMessage if the pattern matches i.e A new log statement, else returns null.
- *
- * @param logLine
- * @return Array containing log level and log message
- */
- public ArrayList<String> splitLogMessage(String logLine) {
- Matcher splitter = SPLITTER_PATTERN.matcher(logLine);
- if (splitter.matches()) {
- ArrayList<String> logParts = new ArrayList<String>();
- logParts.add(splitter.group(1));// timestamp
- logParts.add(splitter.group(2));// log level
- logParts.add(splitter.group(3));// Log Message
- return logParts;
- }
- else {
- return null;
- }
- }
-
- /**
- * Constructs the regular expression according to the filter and assigns it to fileterPattarn. ".*" will be
- * assigned if no filters are set.
- */
- public void constructPattern() {
- if (noFilter && logLevels == null) {
- filterPattern = Pattern.compile(ALLOW_ALL_REGEX);
- return;
- }
- StringBuilder sb = new StringBuilder();
- if (noFilter) {
- sb.append("(.*)");
- }
- else {
- sb.append("(.* ");
- for (int i = 0; i < parameters.size(); i++) {
- sb.append(parameters.get(i) + "\\[");
- sb.append(filterParams.get(parameters.get(i)) + "\\] ");
- }
- sb.append(".*)");
- }
- filterPattern = Pattern.compile(sb.toString());
- }
-
- public static void reset() {
- parameters.clear();
- }
-
- @VisibleForTesting
- public final Map<String, String> getFilterParams() {
- return filterParams;
- }
- }
-
private String logFile;
private String logPath;
- private Filter logFilter;
+ private XLogFilter logFilter;
private long logRotation;
- public XLogStreamer(Filter logFilter, String logPath, String logFile, long logRotationSecs) {
+ public XLogStreamer(XLogFilter logFilter, String logPath, String logFile, long logRotationSecs) {
this.logFilter = logFilter;
if (logFile == null) {
logFile = "oozie-app.log";
@@ -207,8 +59,11 @@ public class XLogStreamer {
*/
public void streamLog(Writer writer, Date startTime, Date endTime, int bufferLen) throws IOException {
// Get a Reader for the log file(s)
- BufferedReader reader = makeReader(startTime, endTime);
+ BufferedReader reader = new BufferedReader(getReader(startTime, endTime));
try {
+ if(logFilter.isDebugMode()){
+ writer.write(logFilter.getDebugMessage());
+ }
// Process the entire logs from the reader using the logFilter
new TimestampedMessageParser(reader, logFilter).processRemaining(writer, bufferLen);
}
@@ -225,7 +80,25 @@ public class XLogStreamer {
* @return A BufferedReader for the log files
* @throws IOException
*/
+
+ private MultiFileReader getReader(Date startTime, Date endTime) throws IOException {
+ logFilter.calculateAndValidateDateRange(startTime, endTime);
+ return new MultiFileReader(getFileList(logFilter.getStartDate(), logFilter.getEndDate()));
+ }
+
public BufferedReader makeReader(Date startTime, Date endTime) throws IOException {
+ return new BufferedReader(getReader(startTime,endTime));
+ }
+
+ /**
+ * Gets the log file list for specific date range.
+ *
+ * @param startTime the start time
+ * @param endTime the end time
+ * @return log file list
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ private ArrayList<File> getFileList(Date startTime, Date endTime) throws IOException {
long startTimeMillis = 0;
long endTimeMillis;
if (startTime != null) {
@@ -238,10 +111,7 @@ public class XLogStreamer {
endTimeMillis = endTime.getTime();
}
File dir = new File(logPath);
- ArrayList<File> files = getFileList(dir, startTimeMillis, endTimeMillis, logRotation, logFile);
- // The MultiFileReader is a Reader that treats the files as one source so we can easily go through them all with one
- // BufferedReader
- return new BufferedReader(new MultiFileReader(files));
+ return getFileList(dir, startTimeMillis, endTimeMillis, logRotation, logFile);
}
/**
http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/main/java/org/apache/oozie/util/XLogUserFilterParam.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/XLogUserFilterParam.java b/core/src/main/java/org/apache/oozie/util/XLogUserFilterParam.java
new file mode 100644
index 0000000..6e49e11
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/util/XLogUserFilterParam.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.client.rest.RestConstants;
+import org.apache.oozie.command.CommandException;
+
+public class XLogUserFilterParam {
+
+ public static final String START_TIME = "START";
+
+ public static final String END_TIME = "END";
+
+ public static final String SEARCH_TEXT = "TEXT";
+
+ public static final String LOG_LEVEL = "LOGLEVEL";
+
+ public static final String LIMIT = "LIMIT";
+
+ public static final String RECENT_LOG_OFFSET = "RECENT";
+
+ public static final String DEBUG = "DEBUG";
+
+ private Date startTime;
+ private Date endTime;
+ private int startOffset;
+ private int endOffset = -1;
+ private int recent = -1;
+ private String logLevel;
+ private int limit = -1;
+ private boolean isDebug = false;
+ private String searchText;
+
+ public static final ThreadLocal<SimpleDateFormat> dt = new ThreadLocal<SimpleDateFormat>() {
+ @Override
+ protected SimpleDateFormat initialValue() {
+ return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ }
+ };
+
+ static final HashSet<String> LOG_LEVELS = new HashSet<String>();
+ static {
+ LOG_LEVELS.add("ALL");
+ LOG_LEVELS.add("DEBUG");
+ LOG_LEVELS.add("ERROR");
+ LOG_LEVELS.add("INFO");
+ LOG_LEVELS.add("TRACE");
+ LOG_LEVELS.add("WARN");
+ LOG_LEVELS.add("FATAL");
+ }
+
+ public XLogUserFilterParam() {
+ }
+
+ /**
+ * Instantiates a new log user param.
+ *
+ * @param params the params
+ * @throws CommandException the command exception
+ */
+ public XLogUserFilterParam(Map<String, String[]> params) throws CommandException {
+ if (params != null && params.get(RestConstants.LOG_FILTER_OPTION) != null
+ && params.get(RestConstants.LOG_FILTER_OPTION).length > 0) {
+ try {
+ parseFilterParam(params.get(RestConstants.LOG_FILTER_OPTION)[0]);
+ }
+ catch (Exception e) {
+ throw new CommandException(ErrorCode.E0302, e.getMessage());
+
+ }
+ }
+ }
+
+ /**
+ * Parse filter param
+ *
+ * @param param the param
+ * @param map the map
+ * @throws Exception
+ */
+ private void parseFilterParam(String param) throws Exception {
+ if (StringUtils.isEmpty(param)) {
+ return;
+ }
+ for (String keyValue : param.split(";")) {
+ String[] pairs = keyValue.split("=");
+ String key = pairs[0].toUpperCase();
+ String value = pairs.length == 1 ? "" : pairs[1];
+ if (key.equals(START_TIME)) {
+ startTime = getDate(value);
+ if (startTime == null) {
+ startOffset = getOffsetInMinute(value);
+ }
+ }
+ else if (key.equals(END_TIME)) {
+ endTime = getDate(value);
+ if (endTime == null) {
+ endOffset = getOffsetInMinute(value);
+ }
+
+ }
+ else if (key.equals(RECENT_LOG_OFFSET)) {
+ recent = getOffsetInMinute(value);
+ }
+ else if (key.equals(LIMIT)) {
+ limit = Integer.parseInt(value);
+
+ }
+ else if (key.equals(LOG_LEVEL)) {
+ logLevel = value;
+ validateLogLevel(logLevel);
+
+ }
+ else if (key.equals(DEBUG)) {
+ isDebug = true;
+
+ }
+ else if (key.equals(SEARCH_TEXT)) {
+ searchText = value;
+
+ }
+ else {
+ throw new Exception("Unsupported log filter " + key);
+ }
+ }
+ }
+
+ /**
+ * Gets the log level.
+ *
+ * @return the log level
+ */
+ public String getLogLevel() {
+ return logLevel;
+
+ }
+
+ /**
+ * Gets the start date.
+ *
+ * @return the start date
+ */
+ public Date getStartDate() {
+ return startTime;
+ }
+
+ /**
+ * Gets the end date.
+ *
+ * @return the end date
+ */
+ public Date getEndDate() {
+ return endTime;
+ }
+
+ /**
+ * Gets the search text.
+ *
+ * @return the search text
+ */
+ public String getSearchText() {
+ return searchText;
+ }
+
+ /**
+ * Validate log level.
+ *
+ * @throws CommandException
+ */
+ public void validateLogLevel(String loglevel) throws CommandException {
+ if (StringUtils.isEmpty(loglevel)) {
+ return;
+ }
+ for (String level : getLogLevel().split("\\|")) {
+ if (!LOG_LEVELS.contains(level)) {
+ throw new CommandException(ErrorCode.E0302, "Supported log level are " + LOG_LEVELS.toString());
+ }
+ }
+ }
+
+ /**
+ * Validate search text.
+ *
+ * @throws CommandException the command exception
+ */
+ public void validateSearchText() throws CommandException {
+ // No restriction on search text.
+
+ }
+
+ /**
+ * Gets the date. Date can in TZ or yyyy-MM-dd HH:mm:ss,SSS format
+ *
+ * @param String date
+ * @return the date
+ */
+ public Date getDate(String date) {
+ try {
+ return DateUtils.parseDateOozieTZ(date);
+ }
+ catch (ParseException e) {
+ try {
+ return dt.get().parse(date);
+ }
+ catch (ParseException e1) {
+ return null;
+ }
+ }
+ }
+
+ /**
+ * Checks if is debug.
+ *
+ * @return true, if it's debug
+ */
+ public boolean isDebug() {
+ return isDebug;
+ }
+
+ public Date getEndTime() {
+ return endTime;
+ }
+
+ public int getEndOffset() {
+ return endOffset;
+ }
+
+ public int getRecent() {
+ return recent;
+ }
+
+ public int getLimit() {
+ return limit;
+ }
+
+ public int getStartOffset() {
+ return startOffset;
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ sb.append(START_TIME).append("=").append(getStartDate()).append(";");
+ sb.append(END_TIME).append("=").append(getEndDate()).append(";");
+ sb.append(LOG_LEVEL).append("=").append(getLogLevel()).append(";");
+ sb.append(LIMIT).append("=").append(getLimit()).append(";");
+ sb.append(RECENT_LOG_OFFSET).append("=").append(getRecent()).append(";");
+ return sb.toString();
+ }
+
+ private int getOffsetInMinute(String offset) throws IOException {
+
+ if (Character.isLetter(offset.charAt(offset.length() - 1))) {
+ switch (offset.charAt(offset.length() - 1)) {
+ case 'H':
+ case 'h':
+ return Integer.parseInt(offset.substring(0, offset.length() - 1)) * 60;
+ case 'M':
+ case 'm':
+ return Integer.parseInt(offset.substring(0, offset.length() - 1));
+ default:
+ throw new IOException("Unsupported offset " + offset);
+ }
+ }
+ else {
+ if (StringUtils.isNumeric(offset)) {
+ return Integer.parseInt(offset) * 60;
+ }
+ else {
+ throw new IOException("Unsupported time : " + offset);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index 70620f6..7ea0d7b 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -2052,4 +2052,25 @@
wf.name=,action.name=,action.type=,launcher=true"
</description>
</property>
+
+ <property>
+ <name>oozie.service.XLogStreamingService.max.log.scan.duration</name>
+ <value>-1</value>
+ <description>
+ Max log scan duration in hours. If log scan request end_date - start_date > value,
+ then exception is thrown to reduce the scan duration. -1 indicate no limit.
+ </description>
+ </property>
+
+ <property>
+ <name>oozie.service.XLogStreamingService.max.actionlist.log.scan.duration</name>
+ <value>-1</value>
+ <description>
+ Max log scan duration in hours for coordinator job when list of actions are specified.
+ If log streaming request end_date - start_date > value, then exception is thrown to reduce the scan duration.
+ -1 indicate no limit.
+ This setting is separate from max.log.scan.duration as we want to allow higher durations when actions are specified.
+ </description>
+ </property>
+
</configuration>
http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java b/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java
index b4f161a..d711dd9 100644
--- a/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java
+++ b/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java
@@ -43,9 +43,10 @@ import org.apache.oozie.service.Services;
import org.apache.oozie.service.XLogStreamingService;
import org.apache.oozie.test.XFsTestCase;
import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.XLogFilter;
import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.XLogUserFilterParam;
import org.apache.oozie.util.XConfiguration;
-import org.apache.oozie.util.XLogStreamer.Filter;
public class TestCoordinatorEngineStreamLog extends XFsTestCase {
private Services services;
@@ -81,12 +82,12 @@ public class TestCoordinatorEngineStreamLog extends XFsTestCase {
}
static class DummyXLogStreamingService extends XLogStreamingService {
- Filter filter;
+ XLogFilter filter;
Date startTime;
Date endTime;
@Override
- public void streamLog(Filter filter1, Date startTime, Date endTime, Writer writer, Map<String, String[]> params)
+ public void streamLog(XLogFilter filter1, Date startTime, Date endTime, Writer writer, Map<String, String[]> params)
throws IOException {
filter = filter1;
this.startTime = startTime;
@@ -120,7 +121,7 @@ public class TestCoordinatorEngineStreamLog extends XFsTestCase {
// Test 1.to test if fields are injected
ce.streamLog(jobId, new StringWriter(), new HashMap<String, String[]>());
DummyXLogStreamingService service = (DummyXLogStreamingService) services.get(XLogStreamingService.class);
- Filter filter = service.filter;
+ XLogFilter filter = service.filter;
assertEquals(filter.getFilterParams().get(DagXLogInfoService.JOB), jobId);
assertTrue(endDate.before(service.endTime));
http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/test/java/org/apache/oozie/service/TestXLogStreamingService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestXLogStreamingService.java b/core/src/test/java/org/apache/oozie/service/TestXLogStreamingService.java
index c03385d..145ddbe 100644
--- a/core/src/test/java/org/apache/oozie/service/TestXLogStreamingService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestXLogStreamingService.java
@@ -19,7 +19,9 @@ package org.apache.oozie.service;
import org.apache.commons.logging.LogFactory;
import org.apache.oozie.test.XTestCase;
+import org.apache.oozie.util.XLogFilter;
import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.XLogUserFilterParam;
import java.io.File;
import java.io.FileOutputStream;
@@ -27,7 +29,6 @@ import java.io.InputStream;
import java.io.StringWriter;
import java.util.HashMap;
import java.util.Properties;
-import org.apache.oozie.util.XLogStreamer;
public class TestXLogStreamingService extends XTestCase {
@@ -128,14 +129,15 @@ public class TestXLogStreamingService extends XTestCase {
}
public void testNoDashInConversionPattern() throws Exception{
- XLogStreamer.Filter.reset();
- XLogStreamer.Filter.defineParameter("USER");
- XLogStreamer.Filter.defineParameter("GROUP");
- XLogStreamer.Filter.defineParameter("TOKEN");
- XLogStreamer.Filter.defineParameter("APP");
- XLogStreamer.Filter.defineParameter("JOB");
- XLogStreamer.Filter.defineParameter("ACTION");
- XLogStreamer.Filter xf = new XLogStreamer.Filter();
+ XLogFilter.reset();
+ XLogFilter.defineParameter("USER");
+ XLogFilter.defineParameter("GROUP");
+ XLogFilter.defineParameter("TOKEN");
+ XLogFilter.defineParameter("APP");
+ XLogFilter.defineParameter("JOB");
+ XLogFilter.defineParameter("ACTION");
+ XLogFilter xf = new XLogFilter(new XLogUserFilterParam(null));
+
xf.setParameter("USER", "oozie");
xf.setLogLevel("DEBUG|INFO");
// Previously, a dash ("-") was always required somewhere in a line in order for that line to pass the filter; this test
@@ -187,10 +189,12 @@ public class TestXLogStreamingService extends XTestCase {
}
private boolean doStreamDisabledCheck() throws Exception {
- return doStreamLog(null).equals("Log streaming disabled!!");
+ XLogFilter xf = new XLogFilter(new XLogUserFilterParam(null));
+
+ return doStreamLog(xf).equals("Log streaming disabled!!");
}
- private String doStreamLog(XLogStreamer.Filter xf) throws Exception {
+ private String doStreamLog(XLogFilter xf) throws Exception {
StringWriter w = new StringWriter();
Services.get().get(XLogStreamingService.class).streamLog(xf, null, null, w, new HashMap<String, String[]>());
return w.toString();
http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java b/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java
index 29bca41..fd48951 100644
--- a/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java
@@ -28,8 +28,8 @@ import java.util.Properties;
import org.apache.commons.logging.LogFactory;
import org.apache.oozie.test.EmbeddedServletContainer;
import org.apache.oozie.test.ZKXTestCase;
+import org.apache.oozie.util.XLogFilter;
import org.apache.oozie.util.IOUtils;
-import org.apache.oozie.util.XLogStreamer;
import org.apache.oozie.util.ZKUtils;
public class TestZKXLogStreamingService extends ZKXTestCase {
@@ -146,14 +146,14 @@ public class TestZKXLogStreamingService extends ZKXTestCase {
}
public void testNoDashInConversionPattern() throws Exception{
- XLogStreamer.Filter.reset();
- XLogStreamer.Filter.defineParameter("USER");
- XLogStreamer.Filter.defineParameter("GROUP");
- XLogStreamer.Filter.defineParameter("TOKEN");
- XLogStreamer.Filter.defineParameter("APP");
- XLogStreamer.Filter.defineParameter("JOB");
- XLogStreamer.Filter.defineParameter("ACTION");
- XLogStreamer.Filter xf = new XLogStreamer.Filter();
+ XLogFilter.reset();
+ XLogFilter.defineParameter("USER");
+ XLogFilter.defineParameter("GROUP");
+ XLogFilter.defineParameter("TOKEN");
+ XLogFilter.defineParameter("APP");
+ XLogFilter.defineParameter("JOB");
+ XLogFilter.defineParameter("ACTION");
+ XLogFilter xf = new XLogFilter();
xf.setParameter("USER", "oozie");
xf.setLogLevel("DEBUG|INFO");
// Previously, a dash ("-") was always required somewhere in a line in order for that line to pass the filter; this test
@@ -188,10 +188,10 @@ public class TestZKXLogStreamingService extends ZKXTestCase {
private boolean doStreamDisabledCheck() throws Exception {
Services.get().get(XLogService.class).init(Services.get());
- return doStreamLog(new XLogStreamer.Filter()).equals("Log streaming disabled!!");
+ return doStreamLog(new XLogFilter()).equals("Log streaming disabled!!");
}
- protected String doStreamLog(XLogStreamer.Filter xf) throws Exception {
+ protected String doStreamLog(XLogFilter xf) throws Exception {
StringWriter w = new StringWriter();
ZKXLogStreamingService zkxlss = new ZKXLogStreamingService();
try {
@@ -210,14 +210,14 @@ public class TestZKXLogStreamingService extends ZKXTestCase {
}
public void testStreamingWithMultipleOozieServers() throws Exception {
- XLogStreamer.Filter.reset();
- XLogStreamer.Filter.defineParameter("USER");
- XLogStreamer.Filter.defineParameter("GROUP");
- XLogStreamer.Filter.defineParameter("TOKEN");
- XLogStreamer.Filter.defineParameter("APP");
- XLogStreamer.Filter.defineParameter("JOB");
- XLogStreamer.Filter.defineParameter("ACTION");
- XLogStreamer.Filter xf = new XLogStreamer.Filter();
+ XLogFilter.reset();
+ XLogFilter.defineParameter("USER");
+ XLogFilter.defineParameter("GROUP");
+ XLogFilter.defineParameter("TOKEN");
+ XLogFilter.defineParameter("APP");
+ XLogFilter.defineParameter("JOB");
+ XLogFilter.defineParameter("ACTION");
+ XLogFilter xf = new XLogFilter();
xf.setParameter("JOB", "0000003-130610102426873-oozie-rkan-W");
xf.setLogLevel("WARN|INFO");
File log4jFile = new File(getTestCaseConfDir(), "test-log4j.properties");
http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/test/java/org/apache/oozie/util/TestLogStreamer.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/util/TestLogStreamer.java b/core/src/test/java/org/apache/oozie/util/TestLogStreamer.java
index 659949f..bc19341 100644
--- a/core/src/test/java/org/apache/oozie/util/TestLogStreamer.java
+++ b/core/src/test/java/org/apache/oozie/util/TestLogStreamer.java
@@ -27,6 +27,10 @@ import java.util.Calendar;
import java.util.Date;
import java.util.zip.GZIPOutputStream;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.service.ServiceException;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.XLogService;
import org.apache.oozie.test.XTestCase;
public class TestLogStreamer extends XTestCase {
@@ -36,16 +40,18 @@ public class TestLogStreamer extends XTestCase {
private final static SimpleDateFormat filenameDateFormatter = new SimpleDateFormat("yyyy-MM-dd-HH");
- public void testStreamLog() throws IOException {
+ public void testStreamLog() throws IOException, CommandException, ServiceException {
+ new Services().init();
+
long currTime = System.currentTimeMillis();
- XLogStreamer.Filter.reset();
- XLogStreamer.Filter.defineParameter("USER");
- XLogStreamer.Filter.defineParameter("GROUP");
- XLogStreamer.Filter.defineParameter("TOKEN");
- XLogStreamer.Filter.defineParameter("APP");
- XLogStreamer.Filter.defineParameter("JOB");
- XLogStreamer.Filter.defineParameter("ACTION");
- XLogStreamer.Filter xf = new XLogStreamer.Filter();
+ XLogFilter.reset();
+ XLogFilter.defineParameter("USER");
+ XLogFilter.defineParameter("GROUP");
+ XLogFilter.defineParameter("TOKEN");
+ XLogFilter.defineParameter("APP");
+ XLogFilter.defineParameter("JOB");
+ XLogFilter.defineParameter("ACTION");
+ XLogFilter xf = new XLogFilter();
xf.setParameter("JOB", "14-200904160239--example-forkjoinwf");
xf.setLogLevel("DEBUG|INFO");
@@ -138,6 +144,9 @@ public class TestLogStreamer extends XTestCase {
// Test for the log retrieval of the job that began 10 hours before and ended 5 hours before current time
// respectively
StringWriter sw = new StringWriter();
+ xf = new XLogFilter();
+ xf.setLogLevel("DEBUG|INFO");
+ xf.setParameter("JOB", "14-200904160239--example-forkjoinwf");
XLogStreamer str = new XLogStreamer(xf, getTestCaseDir(), "oozie.log", 1);
str.streamLog(sw, new Date(currTime - 10 * 3600000), new Date(currTime - 5 * 3600000), 4096);
String[] out = sw.toString().split("\n");
@@ -156,6 +165,9 @@ public class TestLogStreamer extends XTestCase {
// Test to check if the null values for startTime and endTime are translated to 0 and current time respectively
// and corresponding log content is retrieved properly
StringWriter sw1 = new StringWriter();
+ xf = new XLogFilter();
+ xf.setLogLevel("DEBUG|INFO");
+ xf.setParameter("JOB", "14-200904160239--example-forkjoinwf");
XLogStreamer str1 = new XLogStreamer(xf, getTestCaseDir(), "oozie.log", 1);
str1.streamLog(sw1, null, null, 4096);
out = sw1.toString().split("\n");
@@ -173,20 +185,21 @@ public class TestLogStreamer extends XTestCase {
assertEquals(true, out[7].contains("_L7_"));
}
- public void testStreamLogMultipleHours() throws IOException {
-
+ public void testStreamLogMultipleHours() throws IOException, CommandException, ServiceException {
+ new Services().init();
long currTime = System.currentTimeMillis();
- XLogStreamer.Filter.reset();
- XLogStreamer.Filter.defineParameter("USER");
- XLogStreamer.Filter.defineParameter("GROUP");
- XLogStreamer.Filter.defineParameter("TOKEN");
- XLogStreamer.Filter.defineParameter("APP");
- XLogStreamer.Filter.defineParameter("JOB");
- XLogStreamer.Filter.defineParameter("ACTION");
- XLogStreamer.Filter xf = new XLogStreamer.Filter();
+ XLogFilter.reset();
+ XLogFilter.defineParameter("USER");
+ XLogFilter.defineParameter("GROUP");
+ XLogFilter.defineParameter("TOKEN");
+ XLogFilter.defineParameter("APP");
+ XLogFilter.defineParameter("JOB");
+ XLogFilter.defineParameter("ACTION");
+ XLogFilter xf = new XLogFilter();
xf.setParameter("JOB", "14-200904160239--example-forkjoinwf");
xf.setLogLevel("DEBUG|INFO");
+
// Test to check if all gz log files in the range jobStartTime-currentTime are retrieved
String outFilename = "oozie.log-2012-04-24-19.gz";
File f = new File(getTestCaseDir() + "/" + outFilename);
@@ -220,6 +233,9 @@ public class TestLogStreamer extends XTestCase {
// Test for the log retrieval of the job spanning multiple hours
StringWriter sw2 = new StringWriter();
+ xf = new XLogFilter();
+ xf.setParameter("JOB", "14-200904160239--example-forkjoinwf");
+ xf.setLogLevel("DEBUG|INFO");
XLogStreamer str2 = new XLogStreamer(xf, getTestCaseDir(), "oozie.log", 1);
Calendar calendarEntry = Calendar.getInstance();
// Setting start-time to 2012-04-24-19 for log stream (month-1 passed as parameter since 0=January), and end time is current time
@@ -239,16 +255,17 @@ public class TestLogStreamer extends XTestCase {
assertEquals(true, out[4].contains("_L23_"));
}
- public void testStreamLogNoDash() throws IOException {
+ public void testStreamLogNoDash() throws IOException, CommandException, ServiceException {
+ new Services().init();
long currTime = System.currentTimeMillis();
- XLogStreamer.Filter.reset();
- XLogStreamer.Filter.defineParameter("USER");
- XLogStreamer.Filter.defineParameter("GROUP");
- XLogStreamer.Filter.defineParameter("TOKEN");
- XLogStreamer.Filter.defineParameter("APP");
- XLogStreamer.Filter.defineParameter("JOB");
- XLogStreamer.Filter.defineParameter("ACTION");
- XLogStreamer.Filter xf = new XLogStreamer.Filter();
+ XLogFilter.reset();
+ XLogFilter.defineParameter("USER");
+ XLogFilter.defineParameter("GROUP");
+ XLogFilter.defineParameter("TOKEN");
+ XLogFilter.defineParameter("APP");
+ XLogFilter.defineParameter("JOB");
+ XLogFilter.defineParameter("ACTION");
+ XLogFilter xf = new XLogFilter();
xf.setParameter("JOB", "14-200904160239--example-forkjoinwf");
xf.setLogLevel("DEBUG|INFO");
@@ -272,6 +289,10 @@ public class TestLogStreamer extends XTestCase {
f1.setLastModified(currTime);
StringWriter sw = new StringWriter();
+ xf = new XLogFilter();
+ xf.setParameter("JOB", "14-200904160239--example-forkjoinwf");
+ xf.setLogLevel("DEBUG|INFO");
+
XLogStreamer str = new XLogStreamer(xf, getTestCaseDir(), "oozie.log", 1);
str.streamLog(sw, new Date(currTime - 5000), new Date(currTime + 5000), 4096);
String[] out = sw.toString().split("\n");
http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/test/java/org/apache/oozie/util/TestSimplifiedTimestampedMessageParser.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/util/TestSimplifiedTimestampedMessageParser.java b/core/src/test/java/org/apache/oozie/util/TestSimplifiedTimestampedMessageParser.java
index 75341aa..8deac0b 100644
--- a/core/src/test/java/org/apache/oozie/util/TestSimplifiedTimestampedMessageParser.java
+++ b/core/src/test/java/org/apache/oozie/util/TestSimplifiedTimestampedMessageParser.java
@@ -28,14 +28,14 @@ import org.apache.oozie.test.XTestCase;
public class TestSimplifiedTimestampedMessageParser extends XTestCase {
public void testProcessRemainingLog() throws IOException {
- XLogStreamer.Filter.reset();
- XLogStreamer.Filter.defineParameter("USER");
- XLogStreamer.Filter.defineParameter("GROUP");
- XLogStreamer.Filter.defineParameter("TOKEN");
- XLogStreamer.Filter.defineParameter("APP");
- XLogStreamer.Filter.defineParameter("JOB");
- XLogStreamer.Filter.defineParameter("ACTION");
- XLogStreamer.Filter xf = new XLogStreamer.Filter();
+ XLogFilter.reset();
+ XLogFilter.defineParameter("USER");
+ XLogFilter.defineParameter("GROUP");
+ XLogFilter.defineParameter("TOKEN");
+ XLogFilter.defineParameter("APP");
+ XLogFilter.defineParameter("JOB");
+ XLogFilter.defineParameter("ACTION");
+ XLogFilter xf = new XLogFilter();
xf.setParameter("JOB", "14-200904160239--example-forkjoinwf");
xf.setLogLevel("DEBUG|WARN");
@@ -66,14 +66,14 @@ public class TestSimplifiedTimestampedMessageParser extends XTestCase {
}
public void testProcessRemainingCoordinatorLogForActions() throws IOException {
- XLogStreamer.Filter.reset();
- XLogStreamer.Filter.defineParameter("USER");
- XLogStreamer.Filter.defineParameter("GROUP");
- XLogStreamer.Filter.defineParameter("TOKEN");
- XLogStreamer.Filter.defineParameter("APP");
- XLogStreamer.Filter.defineParameter("JOB");
- XLogStreamer.Filter.defineParameter("ACTION");
- XLogStreamer.Filter xf = new XLogStreamer.Filter();
+ XLogFilter.reset();
+ XLogFilter.defineParameter("USER");
+ XLogFilter.defineParameter("GROUP");
+ XLogFilter.defineParameter("TOKEN");
+ XLogFilter.defineParameter("APP");
+ XLogFilter.defineParameter("JOB");
+ XLogFilter.defineParameter("ACTION");
+ XLogFilter xf = new XLogFilter();
xf.setParameter("JOB", "14-200904160239--example-C");
xf.setParameter("ACTION", "14-200904160239--example-C@1");