You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ro...@apache.org on 2014/04/24 23:42:26 UTC

[1/2] OOZIE-1737 Oozie log streaming is slow (puru via rohini)

Repository: oozie
Updated Branches:
  refs/heads/master 2ef171156 -> f9176a01b


http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/test/java/org/apache/oozie/util/TestTimestampedMessageParser.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/util/TestTimestampedMessageParser.java b/core/src/test/java/org/apache/oozie/util/TestTimestampedMessageParser.java
index c2630e7..f9f1dd4 100644
--- a/core/src/test/java/org/apache/oozie/util/TestTimestampedMessageParser.java
+++ b/core/src/test/java/org/apache/oozie/util/TestTimestampedMessageParser.java
@@ -25,6 +25,8 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.StringReader;
 import java.io.StringWriter;
+
+import org.apache.oozie.command.CommandException;
 import org.apache.oozie.test.XTestCase;
 
 public class TestTimestampedMessageParser extends XTestCase {
@@ -113,16 +115,16 @@ public class TestTimestampedMessageParser extends XTestCase {
         return file;
     }
 
-    public void testNofindLogs() {
+    public void testNofindLogs() throws CommandException {
         // Test of OOZIE-1691
-        XLogStreamer.Filter.reset();
-        XLogStreamer.Filter.defineParameter("USER");
-        XLogStreamer.Filter.defineParameter("GROUP");
-        XLogStreamer.Filter.defineParameter("TOKEN");
-        XLogStreamer.Filter.defineParameter("APP");
-        XLogStreamer.Filter.defineParameter("JOB");
-        XLogStreamer.Filter.defineParameter("ACTION");
-        XLogStreamer.Filter xf = new XLogStreamer.Filter();
+        XLogFilter.reset();
+        XLogFilter.defineParameter("USER");
+        XLogFilter.defineParameter("GROUP");
+        XLogFilter.defineParameter("TOKEN");
+        XLogFilter.defineParameter("APP");
+        XLogFilter.defineParameter("JOB");
+        XLogFilter.defineParameter("ACTION");
+        XLogFilter xf = new XLogFilter();
         xf.setParameter("JOB", "14-200904160239--no-found-C");
         xf.setLogLevel("DEBUG|WARN");
         try {
@@ -136,18 +138,18 @@ public class TestTimestampedMessageParser extends XTestCase {
         }
     }
 
-    public void testProcessRemainingLog() throws IOException {
-        XLogStreamer.Filter.reset();
-        XLogStreamer.Filter.defineParameter("USER");
-        XLogStreamer.Filter.defineParameter("GROUP");
-        XLogStreamer.Filter.defineParameter("TOKEN");
-        XLogStreamer.Filter.defineParameter("APP");
-        XLogStreamer.Filter.defineParameter("JOB");
-        XLogStreamer.Filter.defineParameter("ACTION");
-        XLogStreamer.Filter xf = new XLogStreamer.Filter();
+    public void testProcessRemainingLog() throws IOException, CommandException {
+
+        XLogFilter.reset();
+        XLogFilter.defineParameter("USER");
+        XLogFilter.defineParameter("GROUP");
+        XLogFilter.defineParameter("TOKEN");
+        XLogFilter.defineParameter("APP");
+        XLogFilter.defineParameter("JOB");
+        XLogFilter.defineParameter("ACTION");
+        XLogFilter xf = new XLogFilter();
         xf.setParameter("JOB", "14-200904160239--example-forkjoinwf");
         xf.setLogLevel("DEBUG|WARN");
-
         File file = prepareFile1(getTestCaseDir());
         StringWriter sw = new StringWriter();
         new TimestampedMessageParser(new BufferedReader(new FileReader(file)), xf).processRemaining(sw, 4096);
@@ -169,15 +171,15 @@ public class TestTimestampedMessageParser extends XTestCase {
         assertTrue(out[13].contains("_L17_"));
     }
 
-    public void testProcessRemainingCoordinatorLogForActions() throws IOException {
-        XLogStreamer.Filter.reset();
-        XLogStreamer.Filter.defineParameter("USER");
-        XLogStreamer.Filter.defineParameter("GROUP");
-        XLogStreamer.Filter.defineParameter("TOKEN");
-        XLogStreamer.Filter.defineParameter("APP");
-        XLogStreamer.Filter.defineParameter("JOB");
-        XLogStreamer.Filter.defineParameter("ACTION");
-        XLogStreamer.Filter xf = new XLogStreamer.Filter();
+    public void testProcessRemainingCoordinatorLogForActions() throws IOException, CommandException {
+        XLogFilter.reset();
+        XLogFilter.defineParameter("USER");
+        XLogFilter.defineParameter("GROUP");
+        XLogFilter.defineParameter("TOKEN");
+        XLogFilter.defineParameter("APP");
+        XLogFilter.defineParameter("JOB");
+        XLogFilter.defineParameter("ACTION");
+        XLogFilter xf = new XLogFilter();
         xf.setParameter("JOB", "14-200904160239--example-C");
         xf.setParameter("ACTION", "14-200904160239--example-C@1");
 
@@ -191,8 +193,8 @@ public class TestTimestampedMessageParser extends XTestCase {
     }
 
     public void testLifecycle() throws Exception {
-        XLogStreamer.Filter.reset();
-        XLogStreamer.Filter xf = new XLogStreamer.Filter();
+        XLogFilter.reset();
+        XLogFilter xf = new XLogFilter();
         String str1 = "2009-06-24 02:43:13,958 DEBUG _L1_:323 - USER[oozie] GROUP[-] TOKEN[-] APP[example-forkjoinwf] "
                 + "JOB[14-200904160239--example-forkjoinwf] ACTION[-] End workflow state change\n";
         String str2 = "2009-06-24 02:43:13,961  INFO _L2_:317 - USER[-] GROUP[-] TOKEN[-] APP[example-forkjoinwf] "

http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/test/java/org/apache/oozie/util/TestXLogFilter.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/util/TestXLogFilter.java b/core/src/test/java/org/apache/oozie/util/TestXLogFilter.java
index 8d0fd71..5301784 100644
--- a/core/src/test/java/org/apache/oozie/util/TestXLogFilter.java
+++ b/core/src/test/java/org/apache/oozie/util/TestXLogFilter.java
@@ -17,20 +17,20 @@
  */
 package org.apache.oozie.util;
 
+import org.apache.oozie.command.CommandException;
 import org.apache.oozie.service.ServiceException;
 import org.apache.oozie.service.Services;
-import org.apache.oozie.util.XLogStreamer;
 
 import java.util.ArrayList;
 
 import org.apache.oozie.test.XTestCase;
 
 public class TestXLogFilter extends XTestCase {
-    public void testXLogFileter() throws ServiceException {
+    public void testXLogFilter() throws ServiceException, CommandException {
         Services services = new Services();
         services.init();
         try {
-            XLogStreamer.Filter xf2 = new XLogStreamer.Filter();
+            XLogFilter xf2 = new XLogFilter();
             xf2.constructPattern();
             ArrayList<String> a = new ArrayList<String>();
             a.add("2009-06-24 02:43:13,958");
@@ -42,14 +42,15 @@ public class TestXLogFilter extends XTestCase {
             services.destroy();
         }
 
-        XLogStreamer.Filter.reset();
-        XLogStreamer.Filter.defineParameter("USER");
-        XLogStreamer.Filter.defineParameter("GROUP");
-        XLogStreamer.Filter.defineParameter("TOKEN");
-        XLogStreamer.Filter.defineParameter("APP");
-        XLogStreamer.Filter.defineParameter("JOB");
-        XLogStreamer.Filter.defineParameter("ACTION");
-        XLogStreamer.Filter xf = new XLogStreamer.Filter();
+        XLogFilter.reset();
+        XLogFilter.defineParameter("USER");
+        XLogFilter.defineParameter("GROUP");
+        XLogFilter.defineParameter("TOKEN");
+        XLogFilter.defineParameter("APP");
+        XLogFilter.defineParameter("JOB");
+        XLogFilter.defineParameter("ACTION");
+        XLogFilter xf = new XLogFilter();
+
 
         assertEquals(7, matches(xf));
         xf.setLogLevel(XLog.Level.WARN.toString());
@@ -63,7 +64,7 @@ public class TestXLogFilter extends XTestCase {
         xf.setParameter("JOB", "14-200904160239--example-forkjoinwf");
         assertEquals(2, matches(xf));
 
-        XLogStreamer.Filter xf1 = new XLogStreamer.Filter();
+        XLogFilter xf1 = new XLogFilter();
         xf1.setParameter("USER", "oozie");
         assertEquals(3, matches(xf1));
 
@@ -74,7 +75,7 @@ public class TestXLogFilter extends XTestCase {
         assertEquals(1, matches(xf1));
     }
 
-    private int matches(XLogStreamer.Filter xf) {
+    private int matches(XLogFilter xf) {
         xf.constructPattern();
         ArrayList<String> a = new ArrayList<String>();
         a.add("2009-06-24 02:43:13,958 DEBUG WorkflowRunnerCallable:323 - USER[oozie] GROUP[-] TOKEN[-] APP[example-forkjoinwf] JOB[14-200904160239--example-forkjoinwf] ACTION[-] End workflow state change");

http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/test/java/org/apache/oozie/util/TestXLogUserFilterParam.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/util/TestXLogUserFilterParam.java b/core/src/test/java/org/apache/oozie/util/TestXLogUserFilterParam.java
new file mode 100644
index 0000000..c769883
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/util/TestXLogUserFilterParam.java
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.util;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.oozie.client.rest.RestConstants;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.XLogService;
+import org.apache.oozie.service.XLogStreamingService;
+import org.apache.oozie.test.XTestCase;
+import org.apache.oozie.util.XLogFilter;
+import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.XLogUserFilterParam;
+
+public class TestXLogUserFilterParam extends XTestCase {
+
+    final static SimpleDateFormat dt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        super.tearDown();
+    }
+
+    public void setLogFile() throws Exception {
+        XLogFilter.reset();
+        File log4jFile = new File(getTestCaseConfDir(), "test-log4j.properties");
+        ClassLoader cl = Thread.currentThread().getContextClassLoader();
+        InputStream is = cl.getResourceAsStream("test-no-dash-log4j.properties");
+
+        InputStream errorLogStream = cl.getResourceAsStream("userLogFilterTestlog.log");
+
+        Properties log4jProps = new Properties();
+        log4jProps.load(is);
+        // prevent conflicts with other tests by changing the log file location
+        log4jProps.setProperty("log4j.appender.oozie.File", getTestCaseDir() + "/oozie.log");
+        log4jProps.store(new FileOutputStream(log4jFile), "");
+        setSystemProperty(XLogService.LOG4J_FILE, log4jFile.getName());
+
+        new Services().init();
+        File logFile = new File(Services.get().get(XLogService.class).getOozieLogPath(), Services.get()
+                .get(XLogService.class).getOozieLogName());
+        IOUtils.copyStream(errorLogStream, new FileOutputStream(logFile));
+    }
+
+    public void testloglevel_Error() throws Exception {
+        setLogFile();
+        String logLevel = XLogUserFilterParam.LOG_LEVEL + "=ERROR";
+        Map<String, String[]> paramMap = new HashMap<String, String[]>();
+        paramMap.put(RestConstants.LOG_FILTER_OPTION, new String[] { logLevel });
+
+        XLogFilter xf = new XLogFilter(new XLogUserFilterParam(paramMap));
+        String out = doStreamLog(xf);
+        int count = 0;
+        for (String line : out.split(System.getProperty("line.separator"))) {
+            assertFalse(line.contains("DEBUG"));
+            assertFalse(line.contains("INFO"));
+            count++;
+        }
+        assertEquals(count, 20);
+    }
+
+    // Test multiple log level
+    public void testloglevel_DEBUF_INFO() throws Exception {
+        setLogFile();
+
+        String logLevel = XLogUserFilterParam.LOG_LEVEL + "=DEBUG|INFO";
+        Map<String, String[]> paramMap = new HashMap<String, String[]>();
+        paramMap.put(RestConstants.LOG_FILTER_OPTION, new String[] { logLevel });
+        XLogFilter xf = new XLogFilter(new XLogUserFilterParam(paramMap));
+        String out = doStreamLog(xf);
+
+        String lines[] = out.split(System.getProperty("line.separator"));
+
+        assertEquals(lines.length, 4);
+
+        assertTrue(lines[1]
+                .startsWith("2014-02-27 02:06:49,215 DEBUG CoordActionStartXCommand:545 [pool-2-thread-236] - USER[-] GROUP[-] "
+                        + "TOKEN[-] APP[-]"));
+        assertTrue(lines[2]
+                .startsWith("2014-02-27 02:06:49,215 DEBUG CoordActionStartXCommand:545 [pool-2-thread-236] - USER[-] GROUP[-] "
+                        + "TOKEN[-] APP[-] JOB[0601839-140127221758655-oozie-wrkf-C]"));
+        assertTrue(lines[3]
+                .startsWith("2014-02-27 02:06:49,215 INFO CoordActionStartXCommand:545 [pool-2-thread-236] - USER[-] GROUP[-] "
+                        + "TOKEN[-] APP[-] JOB[0601839-140127221758655-oozie-wrkf-C]"));
+
+    }
+
+    // test log level with length
+    public void testloglevel_ErrorWithLen() throws Exception {
+        setLogFile();
+        String logLevel = XLogUserFilterParam.LOG_LEVEL + "=ERROR;" + XLogUserFilterParam.LIMIT + "=1";
+        Map<String, String[]> paramMap = new HashMap<String, String[]>();
+        paramMap.put(RestConstants.LOG_FILTER_OPTION, new String[] { logLevel });
+
+        XLogFilter xf = new XLogFilter(new XLogUserFilterParam(paramMap));
+        String out = doStreamLog(xf);
+        String lines[] = out.split(System.getProperty("line.separator"));
+
+        assertEquals(lines.length, 1);
+    }
+
+    // test length
+    public void testLength() throws Exception {
+        setLogFile();
+
+        String logLevel = XLogUserFilterParam.LIMIT + "=1";
+        Map<String, String[]> paramMap = new HashMap<String, String[]>();
+        paramMap.put(RestConstants.LOG_FILTER_OPTION, new String[] { logLevel });
+        XLogFilter xf = new XLogFilter(new XLogUserFilterParam(paramMap));
+        String out = doStreamLog(xf);
+        String lines[] = out.split(System.getProperty("line.separator"));
+
+        assertEquals(lines.length, 1);
+    }
+
+    // Test text search
+    public void testTextSearch() throws Exception {
+        setLogFile();
+        XLogFilter filter = new XLogFilter();
+
+        String param = XLogUserFilterParam.SEARCH_TEXT + "=substitution;" + XLogUserFilterParam.LIMIT + "=2";
+        Map<String, String[]> paramMap = new HashMap<String, String[]>();
+        paramMap.put(RestConstants.LOG_FILTER_OPTION, new String[] { param });
+
+        XLogUserFilterParam logUtil = new XLogUserFilterParam(paramMap);
+        filter.setUserLogFilter(logUtil);
+        String out = doStreamLog(filter);
+        String lines[] = out.split(System.getProperty("line.separator"));
+        assertEquals(lines.length, 2);
+
+        assertTrue(lines[0].contains("E0803: IO error, Variable substitution depth too large: 20 ${dniInputDir}"));
+        assertTrue(lines[1].contains("E0803: IO error, Variable substitution depth too large: 20 ${dniInputDir}"));
+
+    }
+
+    // Test text search with log level
+    public void testsearchText_logLevel() throws Exception {
+        setLogFile();
+        String param = XLogUserFilterParam.SEARCH_TEXT + "=substitution;" + XLogUserFilterParam.LOG_LEVEL + "=DEBUG";
+        Map<String, String[]> paramMap = new HashMap<String, String[]>();
+        paramMap.put(RestConstants.LOG_FILTER_OPTION, new String[] { param });
+
+        XLogFilter filter = new XLogFilter(new XLogUserFilterParam(paramMap));
+        String out = doStreamLog(filter);
+        String lines[] = out.split(System.getProperty("line.separator"));
+        assertEquals(lines.length, 1);
+
+        assertTrue(lines[0]
+                .contains("2014-02-27 02:06:47,499 DEBUG CoordActionStartXCommand:536 [pool-2-thread-236] - USER[-]"));
+        assertTrue(lines[0].contains("E0803: IO error, Variable substitution depth too large: 20 ${dniInputDir}"));
+    }
+
+    // Test logduration, out of range
+    public void testException() throws Exception {
+        File log4jFile = new File(getTestCaseConfDir(), "test-log4j.properties");
+        ClassLoader cl = Thread.currentThread().getContextClassLoader();
+        InputStream is = cl.getResourceAsStream("test-no-dash-log4j.properties");
+
+        Properties log4jProps = new Properties();
+        log4jProps.load(is);
+        // prevent conflicts with other tests by changing the log file location
+        log4jProps.setProperty("log4j.appender.oozie.File", getTestCaseDir() + "/oozie.log");
+        log4jProps.store(new FileOutputStream(log4jFile), "");
+        setSystemProperty(XLogService.LOG4J_FILE, log4jFile.getName());
+
+        new Services().init();
+        Services.get().getConf().setInt(XLogFilter.MAX_SCAN_DURATION, 10);
+        Map<String, String[]> paramMap = new HashMap<String, String[]>();
+        paramMap.put(RestConstants.LOG_FILTER_OPTION, new String[] {});
+        XLogFilter filter = new XLogFilter(new XLogUserFilterParam(paramMap));
+        Date startDate = new Date();
+        Date endDate = new Date(startDate.getTime() + 60 * 60 * 1000 * 11);
+
+        try {
+            doStreamLog(filter, startDate, endDate);
+            fail("should not come here");
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+            assertTrue(e.getMessage().contains(
+                    "Request log streaming time range is higher than configured."));
+        }
+    }
+
+    // Test log duration - in range
+    public void testNoException_Withrecent() throws Exception {
+        File log4jFile = new File(getTestCaseConfDir(), "test-log4j.properties");
+        ClassLoader cl = Thread.currentThread().getContextClassLoader();
+        InputStream is = cl.getResourceAsStream("test-no-dash-log4j.properties");
+
+        Properties log4jProps = new Properties();
+        log4jProps.load(is);
+        // prevent conflicts with other tests by changing the log file location
+        log4jProps.setProperty("log4j.appender.oozie.File", getTestCaseDir() + "/oozie.log");
+        log4jProps.store(new FileOutputStream(log4jFile), "");
+        setSystemProperty(XLogService.LOG4J_FILE, log4jFile.getName());
+
+        new Services().init();
+        Services.get().getConf().setInt(XLogFilter.MAX_SCAN_DURATION, 10);
+        Map<String, String[]> paramMap = new HashMap<String, String[]>();
+        String param = XLogUserFilterParam.RECENT_LOG_OFFSET + "=9";
+
+        paramMap.put(RestConstants.LOG_FILTER_OPTION, new String[] { param });
+        XLogFilter filter = new XLogFilter(new XLogUserFilterParam(paramMap));
+        Date startDate = new Date();
+        Date endDate = new Date(startDate.getTime() + 60 * 60 * 1000 * 15);
+
+        try {
+            doStreamLog(filter, startDate, endDate);
+        }
+        catch (Exception e) {
+            fail("should not throw exception");
+        }
+
+    }
+
+    // Test multiple combination
+    public void testCombination() throws Exception {
+        File log4jFile = new File(getTestCaseConfDir(), "test-log4j.properties");
+        ClassLoader cl = Thread.currentThread().getContextClassLoader();
+        InputStream is = cl.getResourceAsStream("test-no-dash-log4j.properties");
+
+        Properties log4jProps = new Properties();
+        log4jProps.load(is);
+        log4jProps.setProperty("log4j.appender.oozie.File", getTestCaseDir() + "/oozie.log");
+        log4jProps.store(new FileOutputStream(log4jFile), "");
+        setSystemProperty(XLogService.LOG4J_FILE, log4jFile.getName());
+
+        new Services().init();
+
+        XLogFilter.reset();
+
+        Map<String, String[]> paramMap = new HashMap<String, String[]>();
+        String param = "start=14-02-20 02:06:25,499;end=14-02-27 02:06:47,550;debug;loglevel=ERROR|WARN;recent=3m";
+        paramMap.put(RestConstants.LOG_FILTER_OPTION, new String[] { param });
+        XLogFilter filter = new XLogFilter(new XLogUserFilterParam(paramMap));
+        // Param date will be overwritten by user param
+        String out = doStreamLog(filter, new Date(), new Date());
+        assertEquals(out.split(System.getProperty("line.separator")).length, 1);
+        assertTrue(out.split(System.getProperty("line.separator"))[0].contains("Log start time = Tue Feb 27 02:03:47"));
+        assertTrue(out.split(System.getProperty("line.separator"))[0].contains("Log end time = Tue Feb 27 02:06:47"));
+
+    }
+
+    // Test start and end date, start as absolute
+    public void testStartEnd_EndOffset() throws Exception {
+        File log4jFile = new File(getTestCaseConfDir(), "test-log4j.properties");
+        ClassLoader cl = Thread.currentThread().getContextClassLoader();
+        InputStream is = cl.getResourceAsStream("test-no-dash-log4j.properties");
+
+        Properties log4jProps = new Properties();
+        log4jProps.load(is);
+        // prevent conflicts with other tests by changing the log file location
+        log4jProps.setProperty("log4j.appender.oozie.File", getTestCaseDir() + "/oozie.log");
+        log4jProps.store(new FileOutputStream(log4jFile), "");
+        setSystemProperty(XLogService.LOG4J_FILE, log4jFile.getName());
+
+        new Services().init();
+        XLogFilter.reset();
+        Map<String, String[]> paramMap = new HashMap<String, String[]>();
+        String param = "start=14-02-20 02:06:25,499;end=3m;debug";
+
+        paramMap.put(RestConstants.LOG_FILTER_OPTION, new String[] { param });
+
+        XLogFilter filter = new XLogFilter(new XLogUserFilterParam(paramMap));
+        // Param date will be overwritten by user param
+        String out = doStreamLog(filter, dt.parse("14-02-20 02:06:25,499"), new Date());
+        assertEquals(out.split(System.getProperty("line.separator")).length, 1);
+        assertTrue(out.split(System.getProperty("line.separator"))[0].contains("Log start time = Tue Feb 20 02:06:25"));
+        assertTrue(out.split(System.getProperty("line.separator"))[0].contains("Log end time = Tue Feb 20 02:11:25"));
+
+    }
+
+    // Test start and end date, both offset
+    public void testStartEnd_bothOffset() throws Exception {
+        File log4jFile = new File(getTestCaseConfDir(), "test-log4j.properties");
+        ClassLoader cl = Thread.currentThread().getContextClassLoader();
+        InputStream is = cl.getResourceAsStream("test-no-dash-log4j.properties");
+
+        Properties log4jProps = new Properties();
+        log4jProps.load(is);
+        // prevent conflicts with other tests by changing the log file location
+        log4jProps.setProperty("log4j.appender.oozie.File", getTestCaseDir() + "/oozie.log");
+        log4jProps.store(new FileOutputStream(log4jFile), "");
+        setSystemProperty(XLogService.LOG4J_FILE, log4jFile.getName());
+        new Services().init();
+        Map<String, String[]> paramMap = new HashMap<String, String[]>();
+        String param = "start=3m;end=13m;debug";
+
+        paramMap.put(RestConstants.LOG_FILTER_OPTION, new String[] { param });
+
+        XLogFilter filter = new XLogFilter(new XLogUserFilterParam(paramMap));
+        // Param date will be overwritten by user param
+        String out = doStreamLog(filter, dt.parse("14-02-20 02:06:25,499"), new Date());
+        assertEquals(out.split(System.getProperty("line.separator")).length, 1);
+        assertTrue(out.split(System.getProperty("line.separator"))[0].contains("Log start time = Tue Feb 20 02:07:25"));
+        assertTrue(out.split(System.getProperty("line.separator"))[0].contains("Log end time = Tue Feb 20 02:21:25"));
+
+    }
+
+    // Test start and end date, both absolute
+    public void testStartEnd_bothabsoulte() throws Exception {
+        File log4jFile = new File(getTestCaseConfDir(), "test-log4j.properties");
+        ClassLoader cl = Thread.currentThread().getContextClassLoader();
+        InputStream is = cl.getResourceAsStream("test-no-dash-log4j.properties");
+
+        Properties log4jProps = new Properties();
+        log4jProps.load(is);
+        // prevent conflicts with other tests by changing the log file location
+        log4jProps.setProperty("log4j.appender.oozie.File", getTestCaseDir() + "/oozie.log");
+        log4jProps.store(new FileOutputStream(log4jFile), "");
+        setSystemProperty(XLogService.LOG4J_FILE, log4jFile.getName());
+        new Services().init();
+
+        Map<String, String[]> paramMap = new HashMap<String, String[]>();
+        String param = "start=14-03-20 02:06:25,499;end=14-03-20 02:10:25,499;debug";
+        paramMap.put(RestConstants.LOG_FILTER_OPTION, new String[] { param });
+        XLogFilter filter = new XLogFilter(new XLogUserFilterParam(paramMap));
+        // Param date will be overwritten by user param
+        String out = doStreamLog(filter, dt.parse("14-01-20 02:06:25,499"), dt.parse("14-02-20 02:06:25,499"));
+        assertEquals(out.split(System.getProperty("line.separator")).length, 1);
+        assertTrue(out.split(System.getProperty("line.separator"))[0].contains("Log start time = Tue Mar 20 02:06:25"));
+        assertTrue(out.split(System.getProperty("line.separator"))[0].contains("Log end time = Tue Mar 20 02:10:25"));
+
+        paramMap = new HashMap<String, String[]>();
+        param = "start=14-03-20 02:06:25;end=14-03-20 02:10:25;debug";
+        paramMap.put(RestConstants.LOG_FILTER_OPTION, new String[] { param });
+        filter = new XLogFilter(new XLogUserFilterParam(paramMap));
+        // Param date will be overwritten by user param
+        out = doStreamLog(filter, dt.parse("14-01-20 02:06:25,499"), dt.parse("14-02-20 02:06:25,499"));
+        assertEquals(out.split(System.getProperty("line.separator")).length, 1);
+        assertTrue(out.split(System.getProperty("line.separator"))[0].contains("Log start time = Tue Mar 20 02:06:25"));
+        assertTrue(out.split(System.getProperty("line.separator"))[0].contains("Log end time = Tue Mar 20 02:10:25"));
+
+
+    }
+
+    // Test start and end date, both absolute
+    public void testStartEnd_startAbsolute() throws Exception {
+        File log4jFile = new File(getTestCaseConfDir(), "test-log4j.properties");
+        ClassLoader cl = Thread.currentThread().getContextClassLoader();
+        InputStream is = cl.getResourceAsStream("test-no-dash-log4j.properties");
+
+        Properties log4jProps = new Properties();
+        log4jProps.load(is);
+        // prevent conflicts with other tests by changing the log file location
+        log4jProps.setProperty("log4j.appender.oozie.File", getTestCaseDir() + "/oozie.log");
+        log4jProps.store(new FileOutputStream(log4jFile), "");
+        setSystemProperty(XLogService.LOG4J_FILE, log4jFile.getName());
+        new Services().init();
+        Map<String, String[]> paramMap = new HashMap<String, String[]>();
+        String param = "start=14-03-20 02:06:25,499;end=4m;debug";
+        paramMap.put(RestConstants.LOG_FILTER_OPTION, new String[] { param });
+        XLogFilter filter = new XLogFilter(new XLogUserFilterParam(paramMap));
+        // Param date will be overwritten by user param
+        String out = doStreamLog(filter, dt.parse("14-01-20 02:06:25,499"), dt.parse("14-02-20 02:06:25,499"));
+        assertEquals(out.split(System.getProperty("line.separator")).length, 1);
+        assertTrue(out.split(System.getProperty("line.separator"))[0].contains("Log start time = Tue Mar 20 02:06:25"));
+        assertTrue(out.split(System.getProperty("line.separator"))[0].contains("Log end time = Tue Mar 20 02:12:25"));
+
+    }
+
+    private String doStreamLog(XLogFilter xf) throws Exception {
+        return doStreamLog(xf, null, null);
+    }
+
+    private String doStreamLog(XLogFilter xf, Date startDate, Date endDate) throws Exception {
+        StringWriter w = new StringWriter();
+        Services.get().get(XLogStreamingService.class)
+                .streamLog(xf, startDate, endDate, w, new HashMap<String, String[]>());
+        return w.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/test/resources/userLogFilterTestlog.log
----------------------------------------------------------------------
diff --git a/core/src/test/resources/userLogFilterTestlog.log b/core/src/test/resources/userLogFilterTestlog.log
new file mode 100644
index 0000000..bf89038
--- /dev/null
+++ b/core/src/test/resources/userLogFilterTestlog.log
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+2014-02-27 02:06:47,499 DEBUG CoordActionStartXCommand:536 [pool-2-thread-236] - USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0601839-140127221758655-oozie-wrkf-C] ACTION[0601839-140127221758655-oozie-wrkf-C@2] Failing the action 0601839-140127221758655-oozie-wrkf-C@2. Because E0803 : E0803: IO error, Variable substitution depth too large: 20 ${dniInputDir}
+2014-02-27 02:06:47,499 ERROR CoordActionStartXCommand:536 [pool-2-thread-236] - USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0601839-140127221758656-oozie-wrkf-C] ACTION[0601839-140127221758655-oozie-wrkf-C@2] Failing the action 0601839-140127221758655-oozie-wrkf-C@2. Because E0803 : E0803: IO error, Variable substitution depth too large: 20 ${dniInputDir}
+2014-02-27 02:06:47,499 ERROR SubmitXCommand:536 [pool-2-thread-236] - USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0601839-140127221758655-oozie-wrkf-C] ACTION[0601839-140127221758655-oozie-wrkf-C@2] XException,
+org.apache.oozie.command.CommandException: E0803: IO error, Variable substitution depth too large: 20 ${dniInputDir}
+    at org.apache.oozie.command.wf.SubmitXCommand.execute(SubmitXCommand.java:252)
+    at org.apache.oozie.command.wf.SubmitXCommand.execute(SubmitXCommand.java:72)
+    at org.apache.oozie.command.XCommand.call(XCommand.java:280)
+    at org.apache.oozie.DagEngine.submitJobFromCoordinator(DagEngine.java:131)
+    at org.apache.oozie.command.coord.CoordActionStartXCommand.execute(CoordActionStartXCommand.java:182)
+    at org.apache.oozie.command.coord.CoordActionStartXCommand.execute(CoordActionStartXCommand.java:63)
+    at org.apache.oozie.command.XCommand.call(XCommand.java:280)
+    at org.apache.oozie.service.CallableQueueService$CallableWrapper.run(CallableQueueService.java:175)
+    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
+    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
+    at java.lang.Thread.run(Thread.java:722)
+Caused by: java.lang.IllegalStateException: Variable substitution depth too large: 20 ${dniInputDir}
+    at org.apache.oozie.util.XConfiguration.substituteVars(XConfiguration.java:172)
+    at org.apache.oozie.util.XConfiguration.get(XConfiguration.java:120)
+    at org.apache.oozie.command.wf.SubmitXCommand.execute(SubmitXCommand.java:171)
+    ... 10 more
+2014-02-27 02:06:47,499  WARN CoordActionStartXCommand:542 [pool-2-thread-236] - USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0601839-140127221758655-oozie-wrkf-C] ACTION[0601839-140127221758655-oozie-wrkf-C@2] can not create DagEngine for submitting jobs
+org.apache.oozie.DagEngineException: E0803: IO error, Variable substitution depth too large: 20 ${dniInputDir}
+    at org.apache.oozie.DagEngine.submitJobFromCoordinator(DagEngine.java:136)
+    at org.apache.oozie.command.coord.CoordActionStartXCommand.execute(CoordActionStartXCommand.java:182)
+    at org.apache.oozie.command.coord.CoordActionStartXCommand.execute(CoordActionStartXCommand.java:63)
+    at org.apache.oozie.command.XCommand.call(XCommand.java:280)
+    at org.apache.oozie.service.CallableQueueService$CallableWrapper.run(CallableQueueService.java:175)
+    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
+    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
+    at java.lang.Thread.run(Thread.java:722)
+Caused by: org.apache.oozie.command.CommandException: E0803: IO error, Variable substitution depth too large: 20 ${dniInputDir}
+    at org.apache.oozie.command.wf.SubmitXCommand.execute(SubmitXCommand.java:252)
+    at org.apache.oozie.command.wf.SubmitXCommand.execute(SubmitXCommand.java:72)
+    at org.apache.oozie.command.XCommand.call(XCommand.java:280)
+    at org.apache.oozie.DagEngine.submitJobFromCoordinator(DagEngine.java:131)
+    ... 7 more
+Caused by: java.lang.IllegalStateException: Variable substitution depth too large: 20 ${dniInputDir}
+    at org.apache.oozie.util.XConfiguration.substituteVars(XConfiguration.java:172)
+    at org.apache.oozie.util.XConfiguration.get(XConfiguration.java:120)
+    at org.apache.oozie.command.wf.SubmitXCommand.execute(SubmitXCommand.java:171)
+    ... 10 more
+2014-02-27 02:06:47,499 ERROR CoordActionStartXCommand:536 [pool-2-thread-236] - USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0601839-140127221758655-oozie-wrkf-C] ACTION[0601839-140127221758655-oozie-wrkf-C@2] Failing the action 0601839-140127221758655-oozie-wrkf-C@2. Because E0803 : E0803: IO error, Variable substitution depth too large: 20 ${dniInputDir}
+2014-02-27 02:06:49,215 DEBUG CoordActionStartXCommand:545 [pool-2-thread-236] - USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0601839-140127221758655-oozie-wrkf-C] ACTION[0601839-140127221758655-oozie-wrkf-C@2] Queuing [1] commands with delay [0]ms
+2014-02-27 02:06:49,215 DEBUG CoordActionStartXCommand:545 [pool-2-thread-236] - USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0601839-140127221758655-oozie-wrkf-C] ACTION[0601839-140127221758655-oozie-wrkf-C@2] Released lock for [0601839-140127221758655-oozie-wrkf-C] in [coord_action_start]
+2014-02-27 02:06:49,215 INFO CoordActionStartXCommand:545 [pool-2-thread-236] - USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0601839-140127221758655-oozie-wrkf-C] ACTION[0601839-140127221758655-oozie-wrkf-C@2] Released lock for [0601839-140127221758655-oozie-wrkf-C] in [coord_action_start]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/docs/src/site/twiki/DG_CommandLineTool.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/DG_CommandLineTool.twiki b/docs/src/site/twiki/DG_CommandLineTool.twiki
index 222389e..f3231c7 100644
--- a/docs/src/site/twiki/DG_CommandLineTool.twiki
+++ b/docs/src/site/twiki/DG_CommandLineTool.twiki
@@ -67,6 +67,8 @@ usage:
                                       coordinator job; new pausetime value for changing a bundle job
                 -verbose              verbose mode
                 -update               Update coordinator definition and properties
+                -logfilter            job log search parameter. Can be specified as -logfilter opt1=val1;opt2=val1;opt3=val1.
+                                      Supported options are recent, start, end, loglevel, text, limit and debug.
 .
       oozie jobs <OPTIONS> : jobs status
                  -auth <arg>          select authentication type [SIMPLE|KERBEROS]
@@ -558,6 +560,71 @@ $ oozie job -log <coord_job_id> [-action 1, 3-4, 7-40] (-action is optional.)
 
 </verbatim>
 
+---+++ Filtering the server logs with logfilter options
+
+User can provide multiple option to filter logs using -logfilter opt1=val1;opt2=val1;opt3=val1. This can be used to fetch only just logs of interest faster as fetching Oozie server logs is slow due to the overhead of pattern matching.
+Available options are:
+   * recent: Specify recent hours/min of logs to scan. The recent offset specified is taken relative to the =end= time specified, job end time or the current system time if the job is still running in that order of precedence. For eg: recent=3h or recent=30m will fetch logs starting 3 hours/30 minutes before the end time and up to the end time. H/h is used to denote hour and M/m is used to denote minutes. If no suffix is specified default is hours.
+   * start: Start time for scanning logs. Default is start time of the job. User can provide a valid date or offset similar to =recent= option. Valid date formats are "yyyy-MM-dd'T'HH:mm'Z'" and "yyyy-MM-dd HH:mm:ss,SSS". When an offset is specified, it is calculated relative to the start time of the job. For eg: start=2h will fetch logs starting 2 hours after the job was started.
+   * end: End time for scanning logs. Default is end time of the job or current system time if the job is still running. User can provide a valid date or offset similar to =start= option. When an offset is specified, it is calculated relative to start time i.e job start time . For eg: end=2h will fetch logs from start time and  start time plus 2 hours.
+   * loglevel : Multiple log levels separated by "|" can be specified. Supported log levels are ALL, DEBUG, ERROR, INFO, TRACE, WARN, FATAL.
+   * text: String to search in logs.
+   * limit : Limit number of line to be searched. Log search will end when when n lines(excluding stack-trace) have been matched.
+   * debug : Prints debug information on the log files, time ranges and patterns being searched for. Can be used to debug if expected logs are not shown with the filter options provided.
+
+Examples.
+Searching log with log level ERROR or WARN will only give log with Error and Warning (with stack-trace) only.
+This will be useful if job has failed and user want to find error logs with exception.
+<verbatim>
+
+$ ./oozie job -log 0000006-140319184715726-oozie-puru-W  -logfilter loglevel=WARN\;limit=3 -oozie http://localhost:11000/oozie/
+2014-03-20 10:01:52,977  WARN ActionStartXCommand:542 - SERVER[ ] USER[-] GROUP[-] TOKEN[] APP[map-reduce-wf] JOB[0000006-140319184715726-oozie-puru-W] ACTION[0000006-140319184715726-oozie-puru-W@:start:] [***0000006-140319184715726-oozie-puru-W@:start:***]Action status=DONE
+2014-03-20 10:01:52,977  WARN ActionStartXCommand:542 - SERVER[ ] USER[-] GROUP[-] TOKEN[] APP[map-reduce-wf] JOB[0000006-140319184715726-oozie-puru-W] ACTION[0000006-140319184715726-oozie-puru-W@:start:] [***0000006-140319184715726-oozie-puru-W@:start:***]Action updated in DB!
+2014-03-20 10:01:53,189  WARN ActionStartXCommand:542 - SERVER[ ] USER[-] GROUP[-] TOKEN[] APP[map-reduce-wf] JOB[0000006-140319184715726-oozie-puru-W] ACTION[0000006-140319184715726-oozie-puru-W@mr-node-1] Error starting action [mr-node-1]. ErrorType [TRANSIENT], ErrorCode [JA009], Message [JA009: java.io.IOException: java.io.IOException: Queue "aaadefault" does not exist
+	at org.apache.hadoop.mapred.JobTracker.submitJob(JobTracker.java:3615)
+	at org.apache.hadoop.mapred.JobTracker.submitJob(JobTracker.java:3561)
+	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
+	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
+	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
+	at java.lang.reflect.Method.invoke(Method.java:597)
+	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:587)
+	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1432)
+	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1428)
+	at java.security.AccessController.doPrivileged(Native Method)
+	at javax.security.auth.Subject.doAs(Subject.java:394)
+	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
+	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1426)
+Caused by: java.io.IOException: Queue "aaadefault" does not exist
+	at org.apache.hadoop.mapred.JobInProgress.<init>(JobInProgress.java:433)
+	at org.apache.hadoop.mapred.JobTracker.submitJob(JobTracker.java:3613)
+	... 12 more
+$
+</verbatim>
+
+Search with specific text and recent option.
+<verbatim>
+$ ./oozie job -log 0000003-140319184715726-oozie-puru-C  -logfilter text=Missing\;limit=4\;recent=1h -oozie http://localhost:11000/oozie/
+2014-03-20 09:59:50,329  INFO CoordActionInputCheckXCommand:539 - SERVER[ ] USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0000003-140319184715726-oozie-puru-C] ACTION[0000003-140319184715726-oozie-puru-C@1] [0000003-140319184715726-oozie-puru-C@1]::CoordActionInputCheck:: Missing deps:hdfs://localhost:9000/user/purushah/examples/input-data/rawLogs/
+2014-03-20 09:59:50,330  INFO CoordActionInputCheckXCommand:539 - SERVER[ ] USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0000003-140319184715726-oozie-puru-C] ACTION[0000003-140319184715726-oozie-puru-C@1] [0000003-140319184715726-oozie-puru-C@1]::ActionInputCheck:: In checkListOfPaths: hdfs://localhost:9000/user/purushah/examples/input-data/rawLogs/ is Missing.
+2014-03-20 10:02:19,087  INFO CoordActionInputCheckXCommand:539 - SERVER[ ] USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0000003-140319184715726-oozie-puru-C] ACTION[0000003-140319184715726-oozie-puru-C@2] [0000003-140319184715726-oozie-puru-C@2]::CoordActionInputCheck:: Missing deps:hdfs://localhost:9000/user/purushah/examples/input-data/rawLogs/
+2014-03-20 10:02:19,088  INFO CoordActionInputCheckXCommand:539 - SERVER[ ] USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0000003-140319184715726-oozie-puru-C] ACTION[0000003-140319184715726-oozie-puru-C@2] [0000003-140319184715726-oozie-puru-C@2]::ActionInputCheck:: In checkListOfPaths: hdfs://localhost:9000/user/purushah/examples/input-data/rawLogs/ is Missing.
+$
+</verbatim>
+
+Search example with specific date range.
+<verbatim>
+$ ./oozie job -log 0000003-140319184715726-oozie-puru-C  -logfilter "start=2014-03-20 10:00:57,063;end=2014-03-20 10:10:57,063" -oozie http://localhost:11000/oozie/
+2014-03-20 10:00:57,063  INFO CoordActionUpdateXCommand:539 - SERVER[ ] USER[-] GROUP[-] TOKEN[-] APP[-] JOB[0000003-140319184715726-oozie-puru-C] ACTION[0000003-140319184715726-oozie-puru-C@1] Updating Coordintaor action id :0000003-140319184715726-oozie-puru-C@1 status  to KILLED, pending = 0
+2014-03-20 10:02:18,967  INFO CoordMaterializeTransitionXCommand:539 - SERVER[ ] USER[-] GROUP[-] TOKEN[] APP[aggregator-coord] JOB[0000003-140319184715726-oozie-puru-C] ACTION[-] materialize actions for tz=Coordinated Universal Time,
+ start=Thu Dec 31 18:00:00 PST 2009, end=Thu Dec 31 19:00:00 PST 2009,
+ timeUnit 12,
+ frequency :60:MINUTE,
+ lastActionNumber 1
+2014-03-20 10:02:18,971  WARN CoordELFunctions:542 - SERVER[ ] USER[-] GROUP[-] TOKEN[] APP[aggregator-coord] JOB[0000003-140319184715726-oozie-puru-C] ACTION[-] If the initial instance of the dataset is later than the current-instance specified, such as coord:current(-200) in this case, an empty string is returned. This means that no data is available at the current-instance specified by the user and the user could try modifying his initial-instance to an earlier time.
+2014-03-20 10:02:18,975  INFO CoordMaterializeTransitionXCommand:539 - SERVER[ ] USER[-] GROUP[-] TOKEN[] APP[aggregator-coord] JOB[0000003-140319184715726-oozie-puru-C] ACTION[-] [0000003-140319184715726-oozie-puru-C]: all actions have been materialized, set pending to true
+2014-03-20 10:02:18,975  INFO CoordMaterializeTransitionXCommand:539 - SERVER[ ] USER[-] GROUP[-] TOKEN[] APP[aggregator-coord] JOB[0000003-140319184715726-oozie-puru-C] ACTION[-] Coord Job status updated to = RUNNING
+</verbatim>
+
 ---+++ Dryrun of Coordinator Job
 
 * This feature is only supported in Oozie 2.0 or later.

http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/docs/src/site/twiki/DG_WorkflowReRun.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/DG_WorkflowReRun.twiki b/docs/src/site/twiki/DG_WorkflowReRun.twiki
index e6cca80..88d982b 100644
--- a/docs/src/site/twiki/DG_WorkflowReRun.twiki
+++ b/docs/src/site/twiki/DG_WorkflowReRun.twiki
@@ -16,6 +16,11 @@
    * If secured hadoop version is used, the following two properties needs to be specified as well
       * mapreduce.jobtracker.kerberos.principal
       * dfs.namenode.kerberos.principal.
+   * Configurations can be passed as -D param.
+<verbatim>
+$ oozie job -oozie http://localhost:11000/oozie -rerun 14-20090525161321-oozie-joe -Doozie.wf.rerun.skip.nodes=<>
+</verbatim>
+
 ---++ Pre-Conditions
 
    * Workflow with id wfId should exist.

http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/docs/src/site/twiki/WebServicesAPI.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/WebServicesAPI.twiki b/docs/src/site/twiki/WebServicesAPI.twiki
index c93fc7f..6730255 100644
--- a/docs/src/site/twiki/WebServicesAPI.twiki
+++ b/docs/src/site/twiki/WebServicesAPI.twiki
@@ -1344,6 +1344,17 @@ Content-Type: text/plain;charset=UTF-8
 ...
 </verbatim>
 
+---++++ Filtering the server logs with logfilter options
+User can provide multiple option to filter logs using -logfilter opt1=val1;opt2=val1;opt3=val1. This can be used to fetch only just logs of interest faster as fetching Oozie server logs is slow due to the overhead of pattern matching.
+
+<verbatim>
+GET /oozie/v1/job/0000003-140319184715726-oozie-puru-C?show=log&logfilter=limit=3;loglevel=WARN
+</verbatim>
+
+
+Refer to the [[DG_CommandLineTool#Filtering_the_server_logs_with_logfilter_options][Filtering the server logs with logfilter options]] for more details.
+
+
 ---++++ Job graph
 
 An =HTTP GET= request returns the image of the workflow DAG (rendered as a PNG image).

http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index d380863..eff6224 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1737 Oozie log streaming is slow (puru via rohini)
 OOZIE-1794 java-opts and java-opt in the Java action don't always work properly in YARN (rkanter)
 OOZIE-1799 Document hcatalog integration steps for Oozie in a secure cluster (venkatnrangan via bzhang)
 OOZIE-1585 Upgrade oozie to pig 0.12.1 (bzhang)

http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/webapp/src/main/webapp/oozie-console.js
----------------------------------------------------------------------
diff --git a/webapp/src/main/webapp/oozie-console.js b/webapp/src/main/webapp/oozie-console.js
index 5f4f847..764d888 100644
--- a/webapp/src/main/webapp/oozie-console.js
+++ b/webapp/src/main/webapp/oozie-console.js
@@ -43,11 +43,15 @@ Ext.override(Ext.Component, {
     stateful : false
 });
 
-function getLogs(url, textArea, shouldParseResponse, errorMsg) {
+function getLogs(url, searchFilter, logStatus, textArea, shouldParseResponse, errorMsg) {
     textArea.getEl().dom.value = '';
+    url = url + "&logfilter=" +searchFilter;
+    logStatus.getEl().dom.innerText = "Log status : Loading... done";
 
     if (!errorMsg) {
         errorMsg = "Fatal Error. Can't load logs.";
+        logStatus.getEl().dom.innerText = "Log status : Errored out";
+
     }
     if (!window.XMLHttpRequest) {
         Ext.Ajax.request({
@@ -56,13 +60,17 @@ function getLogs(url, textArea, shouldParseResponse, errorMsg) {
             success : function(response, request) {
                 if (shouldParseResponse) {
                     processAndDisplayLog(response.responseText, textArea);
+                    logStatus.getEl().dom.innerText = "Log status : Loading... done";
+
                 } else {
                     textArea.getEl().dom.value = response.responseText;
+                    logStatus.getEl().dom.innerText = "Log status : Loading... done";
                 }
             },
 
             failure : function() {
                 textArea.getEl().dom.value = errorMsg;
+                logStatus.getEl().dom.innerText = "Log status : Errored out";
             }
         });
 
@@ -85,6 +93,11 @@ function getLogs(url, textArea, shouldParseResponse, errorMsg) {
                 if (xhr.status != 200 && xhr.status != 0) {
                     var errorText = xhr.getResponseHeader('oozie-error-message');
                     textArea.getEl().dom.value = "Error :\n" + (errorText ? errorText : xhr.responseText);
+                    logStatus.getEl().dom.innerText = "Log status : Errored out";
+
+                }
+                if (xhr.readyState =4  && xhr.status == 200) {
+                    logStatus.getEl().dom.innerText = "Log status : Loading... done";
                 }
             } catch (e) {
             }
@@ -349,7 +362,9 @@ function jobDetailsPopup(response, request) {
         width: 1035,
         height: 400,
         autoScroll: true,
-        emptyText: "Loading..."
+        emptyText: "To optimize log searching, you can provide search filter options as opt1=val1;opt2=val1;opt3=val1.\n" +
+        "Available options are recent, start, end, loglevel, text, limit and debug.\n" +
+        "For more detail refer documentation (/oozie/docs/DG_CommandLineTool.html#Filtering_the_server_logs_with_logfilter_options)"
     });
     function fetchDefinition(workflowId) {
         Ext.Ajax.request({
@@ -361,8 +376,8 @@ function jobDetailsPopup(response, request) {
 
         });
     }
-    function fetchLogs(workflowId) {
-        getLogs(getOozieBase() + 'job/' + workflowId + "?show=log", jobLogArea, false, null);
+    function fetchLogs(workflowId, logStatus) {
+        getLogs(getOozieBase() + 'job/' + workflowId + "?show=log", searchFilterBox.getValue(), logStatus, jobLogArea, false, null);
 
     }
     var jobDetails = eval("(" + response.responseText + ")");
@@ -747,6 +762,31 @@ function jobDetailsPopup(response, request) {
         autoScroll: true,
         style  : { overflow: 'auto', overflowX: 'hidden' }
     });
+
+    var searchFilter = new Ext.form.Label({
+        text : 'Enter Search Filter'
+    });
+
+    var searchFilterBox = new Ext.form.TextField({
+                 fieldLabel: 'searchFilterBox',
+                 name: 'searchFilterBox',
+                 width: 350,
+                 value: ''
+    });
+
+    var logStatus = new Ext.form.Label({
+        text : 'Log Status : '
+    });
+
+
+    var getLogButton = new Ext.Button({
+        text: 'Get Logs',
+        ctCls: 'x-btn-over',
+        handler: function() {
+            fetchLogs(workflowId, logStatus);
+            }
+    });
+
     var isLoadedDAG = false;
 
     var jobDetailsTab = new Ext.TabPanel({
@@ -774,13 +814,7 @@ function jobDetailsPopup(response, request) {
         }, {
             title: 'Job Log',
             items: jobLogArea,
-            tbar: [ {
-                text: "&nbsp;&nbsp;&nbsp;",
-                icon: 'ext-2.2/resources/images/default/grid/refresh.gif',
-                handler: function() {
-                    fetchLogs(workflowId);
-                }
-            }]
+            tbar: [ searchFilter, searchFilterBox, getLogButton, {xtype: 'tbfill'}, logStatus]
         }, {
             title: 'Job DAG',
             items: imageContainer,
@@ -799,9 +833,6 @@ function jobDetailsPopup(response, request) {
             jobs_grid.setVisible(true);
             return;
         }
-        if (selectedTab.title == 'Job Log') {
-            fetchLogs(workflowId);
-        }
         else if (selectedTab.title == 'Job Definition') {
             fetchDefinition(workflowId);
         } else if(selectedTab.title == 'Job DAG') {
@@ -853,13 +884,17 @@ function coordJobDetailsPopup(response, request) {
         height: 400,
         autoScroll: true,
         emptyText: "Enter the list of actions in the format similar to 1,3-4,7-40 to get logs for specific coordinator actions. " +
-                   "To get the log for the coordinator job, leave the actions field empty."
+                   "To get the log for the coordinator job, leave the actions field empty.\n\n" +
+        "To optimize log searching, you can provide search filter options as opt1=val1;opt2=val1;opt3=val1.\n" +
+        "Available options are recent, start, end, loglevel, text, limit and debug.\n" +
+        "For more detail refer documentation (/oozie/docs/DG_CommandLineTool.html#Filtering_the_server_logs_with_logfilter_options)"
     });
     var getLogButton = new Ext.Button({
-        text: 'Get logs',
-	    handler: function() {
+        text: 'Get Logs',
+        ctCls: 'x-btn-over',
+        handler: function() {
             fetchLogs(coordJobId, actionsTextBox.getValue());
-	    }
+        }
     });
     var actionsTextBox = new Ext.form.TextField({
              fieldLabel: 'ActionsList',
@@ -872,6 +907,22 @@ function coordJobDetailsPopup(response, request) {
         text : 'Enter action list : '
     });
 
+    var searchFilter = new Ext.form.Label({
+        text : 'Enter Search Filter'
+    });
+
+    var searchFilterBox = new Ext.form.TextField({
+                 fieldLabel: 'searchFilterBox',
+                 name: 'searchFilterBox',
+                 width: 350,
+                 value: ''
+    });
+
+    var logStatus = new Ext.form.Label({
+        text : 'Log Status : '
+    });
+
+
     function fetchDefinition(coordJobId) {
         Ext.Ajax.request({
             url: getOozieBase() + 'job/' + coordJobId + "?show=definition",
@@ -884,10 +935,10 @@ function coordJobDetailsPopup(response, request) {
     function fetchLogs(coordJobId, actionsList) {
         if (actionsList == '') {
             getLogs(getOozieBase() + 'job/' + coordJobId + "?show=log",
-                    jobLogArea, true, null);
+                    searchFilterBox.getValue(), logStatus, jobLogArea, true, null);
         } else {
             getLogs(getOozieBase() + 'job/' + coordJobId
-                    + "?show=log&type=action&scope=" + actionsList, jobLogArea,
+                    + "?show=log&type=action&scope=" + actionsList, searchFilterBox.getValue(), logStatus, jobLogArea,
                     true,
                     'Action List format is wrong. Format should be similar to 1,3-4,7-40');
         }
@@ -1279,9 +1330,9 @@ function coordJobDetailsPopup(response, request) {
             title: 'Coord Job Info',
             items: fs
         },{
-	   title: 'Coord Job Definition',
+       title: 'Coord Job Definition',
             items: jobDefinitionArea
-	},{
+    },{
             title: 'Coord Job Configuration',
             items: new Ext.form.TextArea({
             fieldLabel: 'Configuration',
@@ -1296,7 +1347,7 @@ function coordJobDetailsPopup(response, request) {
            title: 'Coord Job Log',
            items: jobLogArea,
            tbar: [
-                   actionsText,actionsTextBox, getLogButton]
+                  actionsText,actionsTextBox, searchFilter, searchFilterBox, getLogButton, {xtype: 'tbfill'}, logStatus]
        },{
            title: 'Coord Action Reruns',
            items: rerunsUnit,
@@ -1331,7 +1382,7 @@ function coordJobDetailsPopup(response, request) {
 }
 
 function bundleJobDetailsPopup(response, request) {
-	var jobDefinitionArea = new Ext.form.TextArea({
+    var jobDefinitionArea = new Ext.form.TextArea({
         fieldLabel: 'Definition',
         editable: false,
         name: 'definition',
@@ -1499,9 +1550,36 @@ function bundleJobDetailsPopup(response, request) {
         width: 1010,
         height: 400,
         autoScroll: true,
-        emptyText: "Loading..."
+        emptyText: "To optimize log searching, you can provide search filter options as opt1=val1;opt2=val1;opt3=val1.\n" +
+        "Available options are recent, start, end, loglevel, text, limit and debug.\n" +
+        "For more detail refer documentation (/oozie/docs/DG_CommandLineTool.html#Filtering_the_server_logs_with_logfilter_options)"
+    });
+
+    var searchFilter = new Ext.form.Label({
+        text : 'Enter Search Filter'
+    });
+
+    var searchFilterBox = new Ext.form.TextField({
+                 fieldLabel: 'searchFilterBox',
+                 name: 'searchFilterBox',
+                 width: 350,
+                 value: ''
+    });
+
+    var logStatus = new Ext.form.Label({
+        text : 'Log Status : '
+    });
+
+
+    var getLogButton = new Ext.Button({
+        text: 'Get Logs',
+        ctCls: 'x-btn-over',
+        handler: function() {
+            getLogs(getOozieBase() + 'job/' + bundleJobId + "?show=log", searchFilterBox.getValue(), logStatus, jobLogArea, false, null);
+        }
     });
 
+
     var jobDetailsTab = new Ext.TabPanel({
         activeTab: 0,
         autoHeight: true,
@@ -1511,8 +1589,8 @@ function bundleJobDetailsPopup(response, request) {
             items: fs
         },{
            title: 'Bundle Job Definition',
-	   		 items: jobDefinitionArea
-	    },{
+             items: jobDefinitionArea
+        },{
             title: 'Bundle Job Configuration',
              items: new Ext.form.TextArea({
               fieldLabel: 'Configuration',
@@ -1526,14 +1604,8 @@ function bundleJobDetailsPopup(response, request) {
       },{
            title: 'Bundle Job Log',
            items: jobLogArea,
-             tbar: [ {
-                text: "&nbsp;&nbsp;&nbsp;",
-                icon: 'ext-2.2/resources/images/default/grid/refresh.gif',
-                handler: function() {
-                    getLogs(getOozieBase() + 'job/' + bundleJobId + "?show=log", jobLogArea, false, null);
-                }
-            }]
-          }]
+             tbar: [searchFilter, searchFilterBox, getLogButton, {xtype: 'tbfill'}, logStatus]
+      }]
      });
 
     jobDetailsTab.addListener("tabchange", function(panel, selectedTab) {
@@ -1541,9 +1613,6 @@ function bundleJobDetailsPopup(response, request) {
             coord_jobs_grid.setVisible(true);
             return;
         }
-        else if (selectedTab.title == 'Bundle Job Log') {
-            getLogs(getOozieBase() + 'job/' + bundleJobId + "?show=log", jobLogArea, false, null);
-        }
         else if (selectedTab.title == 'Bundle Job Definition') {
             fetchDefinition(bundleJobId);
         }
@@ -1662,7 +1731,7 @@ function showConfigurationInWindow(dataObject, windowTitle) {
  * create the coord jobs store
  */
 var coord_jobs_store = new Ext.data.JsonStore({
-	baseParams: {
+    baseParams: {
         jobtype: "coord",
         filter: "status=RUNNING;status=RUNNINGWITHERROR",
         timezone: getTimeZone()
@@ -1704,7 +1773,7 @@ jobs_store.proxy.conn.method = "GET";
  */
 var bundle_jobs_store = new Ext.data.JsonStore({
 
-	baseParams: {
+    baseParams: {
         jobtype: "bundle",
         filter: "status=RUNNING;status=RUNNINGWITHERROR",
         timezone: getTimeZone()
@@ -2319,7 +2388,7 @@ function initConsole() {
             width: 80,
             sortable: true,
             dataIndex: 'status'
-	}, {
+    }, {
             header: "User",
             width: 80,
             sortable: true,


[2/2] git commit: OOZIE-1737 Oozie log streaming is slow (puru via rohini)

Posted by ro...@apache.org.
OOZIE-1737 Oozie log streaming is slow (puru via rohini)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/f9176a01
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/f9176a01
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/f9176a01

Branch: refs/heads/master
Commit: f9176a01be214ecf6409e239f76806ed01be179d
Parents: 2ef1711
Author: Rohini Palaniswamy <ro...@yahoo-inc.com>
Authored: Thu Apr 24 14:42:10 2014 -0700
Committer: Rohini Palaniswamy <ro...@yahoo-inc.com>
Committed: Thu Apr 24 14:42:10 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/oozie/cli/OozieCLI.java     |  14 +-
 .../org/apache/oozie/client/OozieClient.java    |  27 +-
 .../apache/oozie/client/rest/RestConstants.java |   2 +
 .../java/org/apache/oozie/BundleEngine.java     |  26 +-
 .../java/org/apache/oozie/BundleJobBean.java    |  18 +
 .../org/apache/oozie/CoordinatorEngine.java     |  44 +-
 .../main/java/org/apache/oozie/DagEngine.java   |  28 +-
 .../oozie/service/DagXLogInfoService.java       |  10 +-
 .../org/apache/oozie/service/XLogService.java   |  10 +-
 .../oozie/service/XLogStreamingService.java     |   5 +-
 .../oozie/service/ZKXLogStreamingService.java   |  10 +-
 .../org/apache/oozie/servlet/V1JobServlet.java  |  23 +-
 .../util/SimpleTimestampedMessageParser.java    |   2 +-
 .../oozie/util/TimestampedMessageParser.java    |  25 +-
 .../java/org/apache/oozie/util/XLogFilter.java  | 336 ++++++++++++++++
 .../org/apache/oozie/util/XLogStreamer.java     | 180 ++-------
 .../apache/oozie/util/XLogUserFilterParam.java  | 300 ++++++++++++++
 core/src/main/resources/oozie-default.xml       |  21 +
 .../oozie/TestCoordinatorEngineStreamLog.java   |   9 +-
 .../oozie/service/TestXLogStreamingService.java |  26 +-
 .../service/TestZKXLogStreamingService.java     |  38 +-
 .../org/apache/oozie/util/TestLogStreamer.java  |  77 ++--
 .../TestSimplifiedTimestampedMessageParser.java |  32 +-
 .../util/TestTimestampedMessageParser.java      |  62 +--
 .../org/apache/oozie/util/TestXLogFilter.java   |  27 +-
 .../oozie/util/TestXLogUserFilterParam.java     | 403 +++++++++++++++++++
 .../src/test/resources/userLogFilterTestlog.log |  62 +++
 docs/src/site/twiki/DG_CommandLineTool.twiki    |  67 +++
 docs/src/site/twiki/DG_WorkflowReRun.twiki      |   5 +
 docs/src/site/twiki/WebServicesAPI.twiki        |  11 +
 release-log.txt                                 |   1 +
 webapp/src/main/webapp/oozie-console.js         | 151 +++++--
 32 files changed, 1671 insertions(+), 381 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/oozie/cli/OozieCLI.java b/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
index f26e33b..5260ab7 100644
--- a/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
+++ b/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
@@ -302,6 +302,10 @@ public class OozieCLI {
         Option timezone = new Option(TIME_ZONE_OPTION, true,
                 "use time zone with the specified ID (default GMT).\nSee 'oozie info -timezones' for a list");
         Option log = new Option(LOG_OPTION, true, "job log");
+        Option logFilter = new Option(
+                RestConstants.LOG_FILTER_OPTION, true,
+                "job log search parameter. Can be specified as -logfilter opt1=val1;opt2=val1;opt3=val1. "
+                + "Supported options are recent, start, end, loglevel, text, limit and debug");
         Option definition = new Option(DEFINITION_OPTION, true, "job definition");
         Option config_content = new Option(CONFIG_CONTENT_OPTION, true, "job configuration");
         Option verbose = new Option(VERBOSE_OPTION, false, "verbose mode");
@@ -358,6 +362,7 @@ public class OozieCLI {
         jobOptions.addOption(rerun_nocleanup);
         jobOptions.addOption(getAllWorkflows);
         jobOptions.addOptionGroup(actions);
+        jobOptions.addOption(logFilter);
         addAuthOptions(jobOptions);
         jobOptions.addOption(showdiff);
 
@@ -1056,6 +1061,10 @@ public class OozieCLI {
             }
             else if (options.contains(LOG_OPTION)) {
                 PrintStream ps = System.out;
+                String logFilter = null;
+                if (options.contains(RestConstants.LOG_FILTER_OPTION)) {
+                    logFilter = commandLine.getOptionValue(RestConstants.LOG_FILTER_OPTION);
+                }
                 if (commandLine.getOptionValue(LOG_OPTION).contains("-C")) {
                     String logRetrievalScope = null;
                     String logRetrievalType = null;
@@ -1068,7 +1077,8 @@ public class OozieCLI {
                         logRetrievalScope = commandLine.getOptionValue(DATE_OPTION);
                     }
                     try {
-                        wc.getJobLog(commandLine.getOptionValue(LOG_OPTION), logRetrievalType, logRetrievalScope, ps);
+                        wc.getJobLog(commandLine.getOptionValue(LOG_OPTION), logRetrievalType, logRetrievalScope,
+                                logFilter, ps);
                     }
                     finally {
                         ps.close();
@@ -1076,7 +1086,7 @@ public class OozieCLI {
                 }
                 else {
                     if (!options.contains(ACTION_OPTION) && !options.contains(DATE_OPTION)) {
-                        wc.getJobLog(commandLine.getOptionValue(LOG_OPTION), null, null, ps);
+                        wc.getJobLog(commandLine.getOptionValue(LOG_OPTION), null, null, logFilter, ps);
                     }
                     else {
                         throw new OozieCLIException("Invalid options provided for log retrieval. " + ACTION_OPTION

http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/client/src/main/java/org/apache/oozie/client/OozieClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/oozie/client/OozieClient.java b/client/src/main/java/org/apache/oozie/client/OozieClient.java
index e5afdc7..211b5ad 100644
--- a/client/src/main/java/org/apache/oozie/client/OozieClient.java
+++ b/client/src/main/java/org/apache/oozie/client/OozieClient.java
@@ -910,21 +910,35 @@ public class OozieClient {
      * @param jobId job Id.
      * @param logRetrievalType Based on which filter criteria the log is retrieved
      * @param logRetrievalScope Value for the retrieval type
+     * @param logFilter log filter
+     * @param ps Printstream of command line interface
+     * @throws OozieClientException thrown if the job info could not be retrieved.
+     */
+    public void getJobLog(String jobId, String logRetrievalType, String logRetrievalScope, String logFilter,
+            PrintStream ps) throws OozieClientException {
+        new JobLog(jobId, logRetrievalType, logRetrievalScope, logFilter, ps).call();
+    }
+
+    /**
+     * Get the log of a job.
+     *
+     * @param jobId job Id.
+     * @param logRetrievalType Based on which filter criteria the log is retrieved
+     * @param logRetrievalScope Value for the retrieval type
      * @param ps Printstream of command line interface
      * @throws OozieClientException thrown if the job info could not be retrieved.
      */
     public void getJobLog(String jobId, String logRetrievalType, String logRetrievalScope, PrintStream ps)
             throws OozieClientException {
-        new JobLog(jobId, logRetrievalType, logRetrievalScope, ps).call();
+        getJobLog(jobId, logRetrievalType, logRetrievalScope, null, ps);
     }
 
     private class JobLog extends JobMetadata {
         JobLog(String jobId) {
             super(jobId, RestConstants.JOB_SHOW_LOG);
         }
-
-        JobLog(String jobId, String logRetrievalType, String logRetrievalScope, PrintStream ps) {
-            super(jobId, logRetrievalType, logRetrievalScope, RestConstants.JOB_SHOW_LOG, ps);
+        JobLog(String jobId, String logRetrievalType, String logRetrievalScope, String logFilter, PrintStream ps) {
+            super(jobId, logRetrievalType, logRetrievalScope, RestConstants.JOB_SHOW_LOG, logFilter, ps);
         }
     }
 
@@ -984,10 +998,11 @@ public class OozieClient {
                     metaType));
         }
 
-        JobMetadata(String jobId, String logRetrievalType, String logRetrievalScope, String metaType, PrintStream ps) {
+        JobMetadata(String jobId, String logRetrievalType, String logRetrievalScope, String metaType, String logFilter,
+                PrintStream ps) {
             super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
                     metaType, RestConstants.JOB_LOG_TYPE_PARAM, logRetrievalType, RestConstants.JOB_LOG_SCOPE_PARAM,
-                    logRetrievalScope));
+                    logRetrievalScope, RestConstants.LOG_FILTER_OPTION, logFilter));
             printStream = ps;
         }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java b/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
index b0dd6cb..808f9b2 100644
--- a/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
+++ b/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
@@ -174,4 +174,6 @@ public interface RestConstants {
 
     public static final String ALL_WORKFLOWS_FOR_COORD_ACTION = "allruns";
 
+    public static final String LOG_FILTER_OPTION = "logfilter";
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/main/java/org/apache/oozie/BundleEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/BundleEngine.java b/core/src/main/java/org/apache/oozie/BundleEngine.java
index 5bf1538..67f4aba 100644
--- a/core/src/main/java/org/apache/oozie/BundleEngine.java
+++ b/core/src/main/java/org/apache/oozie/BundleEngine.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.StringTokenizer;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.CoordinatorJob;
@@ -51,9 +52,10 @@ import org.apache.oozie.service.DagXLogInfoService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.XLogStreamingService;
 import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.XLogFilter;
+import org.apache.oozie.util.XLogUserFilterParam;
 import org.apache.oozie.util.ParamChecker;
 import org.apache.oozie.util.XLog;
-import org.apache.oozie.util.XLogStreamer;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -241,20 +243,26 @@ public class BundleEngine extends BaseEngine {
      * @see org.apache.oozie.BaseEngine#streamLog(java.lang.String, java.io.Writer)
      */
     @Override
-    public void streamLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException, BundleEngineException {
-        XLogStreamer.Filter filter = new XLogStreamer.Filter();
-        filter.setParameter(DagXLogInfoService.JOB, jobId);
+    public void streamLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
+            BundleEngineException {
 
         BundleJobBean job;
         try {
+            XLogFilter filter = new XLogFilter(new XLogUserFilterParam(params));
+            filter.setParameter(DagXLogInfoService.JOB, jobId);
             job = new BundleJobXCommand(jobId).call();
+            Date lastTime = null;
+            if (job.isTerminalStatus()) {
+                lastTime = job.getLastModifiedTime();
+            }
+            if (lastTime == null) {
+                lastTime = new Date();
+            }
+            Services.get().get(XLogStreamingService.class).streamLog(filter, job.getCreatedTime(), lastTime, writer, params);
         }
-        catch (CommandException ex) {
-            throw new BundleEngineException(ex);
+        catch (Exception ex) {
+            throw new IOException(ex);
         }
-
-        Services.get().get(XLogStreamingService.class)
-                .streamLog(filter, job.getCreatedTime(), new Date(), writer, params);
     }
 
     /* (non-Javadoc)

http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/main/java/org/apache/oozie/BundleJobBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/BundleJobBean.java b/core/src/main/java/org/apache/oozie/BundleJobBean.java
index 4c37e57..0f1670a 100644
--- a/core/src/main/java/org/apache/oozie/BundleJobBean.java
+++ b/core/src/main/java/org/apache/oozie/BundleJobBean.java
@@ -798,4 +798,22 @@ public class BundleJobBean implements Writable, BundleJob, JsonBean {
         return DateUtils.toDate(startTimestamp);
     }
 
+    /**
+     * @return true if in terminal status
+     */
+    public boolean isTerminalStatus() {
+        boolean isTerminal = false;
+        switch (getStatus()) {
+            case SUCCEEDED:
+            case FAILED:
+            case KILLED:
+            case DONEWITHERROR:
+                isTerminal = true;
+                break;
+            default:
+                isTerminal = false;
+                break;
+        }
+        return isTerminal;
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
index 81d98bf..e627a95 100644
--- a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
+++ b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.StringTokenizer;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.CoordinatorJob;
@@ -57,9 +58,10 @@ import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
 import org.apache.oozie.service.DagXLogInfoService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.XLogStreamingService;
+import org.apache.oozie.util.XLogFilter;
+import org.apache.oozie.util.XLogUserFilterParam;
 import org.apache.oozie.util.ParamChecker;
 import org.apache.oozie.util.XLog;
-import org.apache.oozie.util.XLogStreamer;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -271,12 +273,26 @@ public class CoordinatorEngine extends BaseEngine {
      * java.io.Writer)
      */
     @Override
-    public void streamLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException, BaseEngineException {
-        XLogStreamer.Filter filter = new XLogStreamer.Filter();
-        filter.setParameter(DagXLogInfoService.JOB, jobId);
+    public void streamLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
+            BaseEngineException {
 
-        CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
-        Services.get().get(XLogStreamingService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer, params);
+        try {
+            XLogFilter filter = new XLogFilter(new XLogUserFilterParam(params));
+            filter.setParameter(DagXLogInfoService.JOB, jobId);
+            Date lastTime = null;
+            CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
+            if (job.isTerminalStatus()) {
+                lastTime = job.getLastModifiedTime();
+            }
+            if (lastTime == null) {
+                lastTime = new Date();
+            }
+            Services.get().get(XLogStreamingService.class)
+                    .streamLog(filter, job.getCreatedTime(), lastTime, writer, params);
+        }
+        catch (Exception e) {
+            throw new IOException(e);
+        }
     }
 
     /**
@@ -296,7 +312,8 @@ public class CoordinatorEngine extends BaseEngine {
 
         Date startTime = null;
         Date endTime = null;
-        XLogStreamer.Filter filter = new XLogStreamer.Filter();
+        XLogFilter filter = new XLogFilter(new XLogUserFilterParam(params));
+
         filter.setParameter(DagXLogInfoService.JOB, jobId);
         if (logRetrievalScope != null && logRetrievalType != null) {
             // if coordinator action logs are to be retrieved based on action id range
@@ -371,6 +388,7 @@ public class CoordinatorEngine extends BaseEngine {
                     startTime = actionBean.getCreatedTime();
                     endTime = actionBean.getStatus().equals(CoordinatorAction.Status.RUNNING) ? new Date() : actionBean
                             .getLastModifiedTime();
+                    filter.setActionList(true);
                 }
                 else if (actionSet != null && actionSet.size() > 0) {
                     List<String> tempList = new ArrayList<String>(actionSet);
@@ -383,6 +401,7 @@ public class CoordinatorEngine extends BaseEngine {
                     startTime = getCoordAction(tempList.get(0)).getCreatedTime();
                     endTime = CoordActionsInDateRange.getCoordActionsLastModifiedDate(jobId, tempList.get(0),
                             tempList.get(tempList.size() - 1));
+                    filter.setActionList(true);
                 }
             }
             // if coordinator action logs are to be retrieved based on date range
@@ -414,12 +433,12 @@ public class CoordinatorEngine extends BaseEngine {
                     orSeparatedActions.append(")");
                 }
                 filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString());
-
                 if (coordActionIdList != null && coordActionIdList.size() == 1) {
                     CoordinatorActionBean actionBean = getCoordAction(coordActionIdList.get(0));
                     startTime = actionBean.getCreatedTime();
                     endTime = actionBean.getStatus().equals(CoordinatorAction.Status.RUNNING) ? new Date() : actionBean
                             .getLastModifiedTime();
+                    filter.setActionList(true);
                 }
                 else if (coordActionIdList != null && coordActionIdList.size() > 0) {
                     Collections.sort(coordActionIdList, new Comparator<String>() {
@@ -431,6 +450,7 @@ public class CoordinatorEngine extends BaseEngine {
                     startTime = getCoordAction(coordActionIdList.get(0)).getCreatedTime();
                     endTime = CoordActionsInDateRange.getCoordActionsLastModifiedDate(jobId, coordActionIdList.get(0),
                             coordActionIdList.get(coordActionIdList.size() - 1));
+                    filter.setActionList(true);
                 }
             }
         }
@@ -440,10 +460,14 @@ public class CoordinatorEngine extends BaseEngine {
                 startTime = job.getCreatedTime();
             }
             if (endTime == null) {
-                endTime = new Date();
+                if (job.isTerminalStatus()) {
+                    endTime = job.getLastModifiedTime();
+                }
+                if (endTime == null) {
+                    endTime = new Date();
+                }
             }
         }
-        //job.getActions()
         Services.get().get(XLogStreamingService.class).streamLog(filter, startTime, endTime, writer, params);
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/main/java/org/apache/oozie/DagEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/DagEngine.java b/core/src/main/java/org/apache/oozie/DagEngine.java
index cad5ddd..f6e5b4f 100644
--- a/core/src/main/java/org/apache/oozie/DagEngine.java
+++ b/core/src/main/java/org/apache/oozie/DagEngine.java
@@ -17,9 +17,9 @@
  */
 package org.apache.oozie;
 
-import org.apache.oozie.util.XLogStreamer;
 import org.apache.oozie.service.XLogService;
 import org.apache.oozie.service.DagXLogInfoService;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.client.WorkflowJob;
@@ -47,6 +47,8 @@ import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.CallableQueueService;
+import org.apache.oozie.util.XLogFilter;
+import org.apache.oozie.util.XLogUserFilterParam;
 import org.apache.oozie.util.ParamChecker;
 import org.apache.oozie.util.XCallable;
 import org.apache.oozie.util.XConfiguration;
@@ -400,15 +402,21 @@ public class DagEngine extends BaseEngine {
      * @throws DagEngineException thrown if there is error in getting the Workflow Information for jobId.
      */
     @Override
-    public void streamLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException, DagEngineException {
-        XLogStreamer.Filter filter = new XLogStreamer.Filter();
-        filter.setParameter(DagXLogInfoService.JOB, jobId);
-        WorkflowJob job = getJob(jobId);
-        Date lastTime = job.getEndTime();
-        if (lastTime == null) {
-            lastTime = job.getLastModifiedTime();
-        }
-        Services.get().get(XLogStreamingService.class).streamLog(filter, job.getCreatedTime(), lastTime, writer, params);
+    public void streamLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
+            DagEngineException {
+        try {
+            XLogFilter filter = new XLogFilter(new XLogUserFilterParam(params));
+            filter.setParameter(DagXLogInfoService.JOB, jobId);
+            WorkflowJob job = getJob(jobId);
+            Date lastTime = job.getEndTime();
+            if (lastTime == null) {
+                lastTime = job.getLastModifiedTime();
+            }
+            Services.get().get(XLogStreamingService.class).streamLog(filter, job.getCreatedTime(), lastTime, writer, params);
+        }
+        catch (Exception e) {
+            throw new IOException(e);
+        }
     }
 
     private static final Set<String> FILTER_NAMES = new HashSet<String>();

http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/main/java/org/apache/oozie/service/DagXLogInfoService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/DagXLogInfoService.java b/core/src/main/java/org/apache/oozie/service/DagXLogInfoService.java
index 1b18140..a5fad9d 100644
--- a/core/src/main/java/org/apache/oozie/service/DagXLogInfoService.java
+++ b/core/src/main/java/org/apache/oozie/service/DagXLogInfoService.java
@@ -17,7 +17,7 @@
  */
 package org.apache.oozie.service;
 
-import org.apache.oozie.util.XLogStreamer;
+import org.apache.oozie.util.XLogFilter;
 import org.apache.oozie.service.Service;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.util.XLog;
@@ -58,10 +58,10 @@ public class DagXLogInfoService implements Service {
         XLog.Info.defineParameter(JOB);
         XLog.Info.defineParameter(ACTION);
 
-        XLogStreamer.Filter.defineParameter(TOKEN);
-        XLogStreamer.Filter.defineParameter(APP);
-        XLogStreamer.Filter.defineParameter(JOB);
-        XLogStreamer.Filter.defineParameter(ACTION);
+        XLogFilter.defineParameter(TOKEN);
+        XLogFilter.defineParameter(APP);
+        XLogFilter.defineParameter(JOB);
+        XLogFilter.defineParameter(ACTION);
 
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/main/java/org/apache/oozie/service/XLogService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/XLogService.java b/core/src/main/java/org/apache/oozie/service/XLogService.java
index f9383d0..403a089 100644
--- a/core/src/main/java/org/apache/oozie/service/XLogService.java
+++ b/core/src/main/java/org/apache/oozie/service/XLogService.java
@@ -20,10 +20,10 @@ package org.apache.oozie.service;
 import org.apache.commons.logging.LogFactory;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.PropertyConfigurator;
+import org.apache.oozie.util.XLogFilter;
 import org.apache.oozie.util.Instrumentable;
 import org.apache.oozie.util.Instrumentation;
 import org.apache.oozie.util.XLog;
-import org.apache.oozie.util.XLogStreamer;
 import org.apache.oozie.util.XConfiguration;
 import org.apache.oozie.BuildInfo;
 import org.apache.oozie.ErrorCode;
@@ -174,9 +174,9 @@ public class XLogService implements Service, Instrumentable {
             XLog.Info.reset();
             XLog.Info.defineParameter(USER);
             XLog.Info.defineParameter(GROUP);
-            XLogStreamer.Filter.reset();
-            XLogStreamer.Filter.defineParameter(USER);
-            XLogStreamer.Filter.defineParameter(GROUP);
+            XLogFilter.reset();
+            XLogFilter.defineParameter(USER);
+            XLogFilter.defineParameter(GROUP);
 
             // Getting configuration for oozie log via WS
             ClassLoader cl = Thread.currentThread().getContextClassLoader();
@@ -282,7 +282,7 @@ public class XLogService implements Service, Instrumentable {
     public void destroy() {
         LogManager.shutdown();
         XLog.Info.reset();
-        XLogStreamer.Filter.reset();
+        XLogFilter.reset();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/main/java/org/apache/oozie/service/XLogStreamingService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/XLogStreamingService.java b/core/src/main/java/org/apache/oozie/service/XLogStreamingService.java
index f77794e..1bf636b 100644
--- a/core/src/main/java/org/apache/oozie/service/XLogStreamingService.java
+++ b/core/src/main/java/org/apache/oozie/service/XLogStreamingService.java
@@ -17,6 +17,7 @@
  */
 package org.apache.oozie.service;
 
+import org.apache.oozie.util.XLogFilter;
 import org.apache.oozie.util.Instrumentable;
 import org.apache.oozie.util.Instrumentation;
 import org.apache.oozie.util.XLogStreamer;
@@ -32,6 +33,7 @@ import java.util.Date;
 public class XLogStreamingService implements Service, Instrumentable {
     private static final String CONF_PREFIX = Service.CONF_PREFIX + "XLogStreamingService.";
     public static final String STREAM_BUFFER_LEN = CONF_PREFIX + "buffer.len";
+
     protected int bufferLen;
 
     /**
@@ -78,7 +80,7 @@ public class XLogStreamingService implements Service, Instrumentable {
      * @param params additional parameters from the request
      * @throws IOException thrown if the log cannot be streamed.
      */
-    public void streamLog(XLogStreamer.Filter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params)
+    public void streamLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params)
             throws IOException {
         XLogService xLogService = Services.get().get(XLogService.class);
         if (xLogService.getLogOverWS()) {
@@ -88,7 +90,6 @@ public class XLogStreamingService implements Service, Instrumentable {
         else {
             writer.write("Log streaming disabled!!");
         }
-
     }
 
     public int getBufferLen() {

http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java b/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
index 8dc8b4b..7d53378 100644
--- a/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
+++ b/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
@@ -36,6 +36,7 @@ import org.apache.oozie.ErrorCode;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.rest.RestConstants;
 import org.apache.oozie.util.AuthUrlClient;
+import org.apache.oozie.util.XLogFilter;
 import org.apache.oozie.util.SimpleTimestampedMessageParser;
 import org.apache.oozie.util.TimestampedMessageParser;
 import org.apache.oozie.util.XLog;
@@ -101,7 +102,7 @@ public class ZKXLogStreamingService extends XLogStreamingService implements Serv
      * @throws IOException thrown if the log cannot be streamed.
      */
     @Override
-    public void streamLog(XLogStreamer.Filter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params)
+    public void streamLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params)
             throws IOException {
         XLogService xLogService = Services.get().get(XLogService.class);
         if (xLogService.getLogOverWS()) {
@@ -132,7 +133,7 @@ public class ZKXLogStreamingService extends XLogStreamingService implements Serv
      * @param writer
      * @throws IOException
      */
-    private void collateLogs(XLogStreamer.Filter filter, Date startTime, Date endTime, Writer writer) throws IOException {
+    private void collateLogs(XLogFilter filter, Date startTime, Date endTime, Writer writer) throws IOException {
         XLogService xLogService = Services.get().get(XLogService.class);
         List<String> badOozies = new ArrayList<String>();
         List<ServiceInstance<Map>> oozies = null;
@@ -176,6 +177,11 @@ public class ZKXLogStreamingService extends XLogStreamingService implements Serv
                 }
             }
 
+            //If log param debug is set, we need to write start date and end date to outputstream.
+            if(filter.isDebugMode()){
+                writer.write(filter.getDebugMessage());
+            }
+
             // Add a message about any servers we couldn't contact
             if (!badOozies.isEmpty()) {
                 writer.write("Unable to contact the following Oozie Servers for logs (log information may be incomplete):\n");

http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java b/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java
index 5b65791..e03d7a7 100644
--- a/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java
+++ b/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java
@@ -267,15 +267,20 @@ public class V1JobServlet extends BaseJobServlet {
     @Override
     protected void streamJobLog(HttpServletRequest request, HttpServletResponse response) throws XServletException,
             IOException {
-        String jobId = getResourceName(request);
-        if (jobId.endsWith("-W")) {
-            streamWorkflowJobLog(request, response);
-        }
-        else if (jobId.endsWith("-B")) {
-            streamBundleJob(request, response);
+        try {
+            String jobId = getResourceName(request);
+            if (jobId.endsWith("-W")) {
+                streamWorkflowJobLog(request, response);
+            }
+            else if (jobId.endsWith("-B")) {
+                streamBundleJobLog(request, response);
+            }
+            else {
+                streamCoordinatorJobLog(request, response);
+            }
         }
-        else {
-            streamCoordinatorJobLog(request, response);
+        catch (Exception e) {
+            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0307, e.getMessage());
         }
     }
 
@@ -987,7 +992,7 @@ public class V1JobServlet extends BaseJobServlet {
      * @param response servlet response
      * @throws XServletException
      */
-    private void streamBundleJob(HttpServletRequest request, HttpServletResponse response)
+    private void streamBundleJobLog(HttpServletRequest request, HttpServletResponse response)
             throws XServletException, IOException {
         BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request));
         String jobId = getResourceName(request);

http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/main/java/org/apache/oozie/util/SimpleTimestampedMessageParser.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/SimpleTimestampedMessageParser.java b/core/src/main/java/org/apache/oozie/util/SimpleTimestampedMessageParser.java
index 0a5e8d1..a44e8f6 100644
--- a/core/src/main/java/org/apache/oozie/util/SimpleTimestampedMessageParser.java
+++ b/core/src/main/java/org/apache/oozie/util/SimpleTimestampedMessageParser.java
@@ -27,7 +27,7 @@ import java.io.IOException;
  */
 public class SimpleTimestampedMessageParser extends TimestampedMessageParser {
 
-    public SimpleTimestampedMessageParser(BufferedReader reader, XLogStreamer.Filter filter) {
+    public SimpleTimestampedMessageParser(BufferedReader reader, XLogFilter filter) {
         super(reader, filter);
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/main/java/org/apache/oozie/util/TimestampedMessageParser.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/TimestampedMessageParser.java b/core/src/main/java/org/apache/oozie/util/TimestampedMessageParser.java
index 8a11780..8b7666d 100644
--- a/core/src/main/java/org/apache/oozie/util/TimestampedMessageParser.java
+++ b/core/src/main/java/org/apache/oozie/util/TimestampedMessageParser.java
@@ -20,8 +20,11 @@ package org.apache.oozie.util;
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.Writer;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.XLogStreamingService;
 
@@ -40,10 +43,11 @@ public class TimestampedMessageParser {
     protected BufferedReader reader;
     private String nextLine = null;
     private String lastTimestamp = null;
-    private XLogStreamer.Filter filter;
+    private XLogFilter filter;
     private boolean empty = false;
     private String lastMessage = null;
     private boolean patternMatched = false;
+    public int count = 0;
 
     /**
      * Creates a TimestampedMessageParser with the given BufferedReader and filter.
@@ -51,11 +55,11 @@ public class TimestampedMessageParser {
      * @param reader The BufferedReader to get the log messages from
      * @param filter The filter
      */
-    public TimestampedMessageParser(BufferedReader reader, XLogStreamer.Filter filter) {
+    public TimestampedMessageParser(BufferedReader reader, XLogFilter filter) {
         this.reader = reader;
         this.filter = filter;
         if (filter == null) {
-            filter = new XLogStreamer.Filter();
+            filter = new XLogFilter();
         }
         filter.constructPattern();
     }
@@ -140,6 +144,21 @@ public class TimestampedMessageParser {
                 patternMatched = filter.matches(logParts);
             }
             if (patternMatched) {
+                if (filter.getLogLimit() != -1) {
+                    if (logParts != null) {
+                        if (count >= filter.getLogLimit()) {
+                            return null;
+                        }
+                        count++;
+                    }
+                }
+                if (logParts != null) {
+                    if (filter.getEndDate() != null) {
+                        //Ignore the milli second part
+                        if (logParts.get(0).substring(0, 19).compareTo(filter.getFormattedEndDate()) > 0)
+                            return null;
+                    }
+                }
                 return line;
             }
         }

http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/main/java/org/apache/oozie/util/XLogFilter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/XLogFilter.java b/core/src/main/java/org/apache/oozie/util/XLogFilter.java
new file mode 100644
index 0000000..0007c76
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/util/XLogFilter.java
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.oozie.service.Services;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Filter that will construct the regular expression that will be used to filter the log statement. And also checks if
+ * the given log message go through the filter. Filters that can be used are logLevel(Multi values separated by "|")
+ * jobId appName actionId token
+ */
+public class XLogFilter {
+
+    private static final int LOG_TIME_BUFFER = 2; // in min
+    public static String MAX_ACTIONLIST_SCAN_DURATION = "oozie.service.XLogStreamingService.actionlist.max.log.scan.duration";
+    public static String MAX_SCAN_DURATION = "oozie.service.XLogStreamingService.max.log.scan.duration";
+    private Map<String, Integer> logLevels;
+    private final Map<String, String> filterParams;
+    private static List<String> parameters = new ArrayList<String>();
+    private boolean noFilter;
+    private Pattern filterPattern;
+    private XLogUserFilterParam userLogFilter;
+    private Date endDate;
+    private Date startDate;
+    private boolean isActionList = false;
+    private String formattedEndDate;
+
+    // TODO Patterns to be read from config file
+    private static final String DEFAULT_REGEX = "[^\\]]*";
+
+    public static final String ALLOW_ALL_REGEX = "(.*)";
+    private static final String TIMESTAMP_REGEX = "(\\d\\d\\d\\d-\\d\\d-\\d\\d \\d\\d:\\d\\d:\\d\\d,\\d\\d\\d)";
+    private static final String WHITE_SPACE_REGEX = "\\s+";
+    private static final String LOG_LEVEL_REGEX = "(\\w+)";
+    private static final String PREFIX_REGEX = TIMESTAMP_REGEX + WHITE_SPACE_REGEX + LOG_LEVEL_REGEX
+            + WHITE_SPACE_REGEX;
+    private static final Pattern SPLITTER_PATTERN = Pattern.compile(PREFIX_REGEX + ALLOW_ALL_REGEX);
+
+    public XLogFilter() {
+        this(new XLogUserFilterParam());
+    }
+
+    public XLogFilter(XLogUserFilterParam userLogFilter) {
+        filterParams = new HashMap<String, String>();
+        for (int i = 0; i < parameters.size(); i++) {
+            filterParams.put(parameters.get(i), DEFAULT_REGEX);
+        }
+        logLevels = null;
+        noFilter = true;
+        filterPattern = null;
+        setUserLogFilter(userLogFilter);
+    }
+
+    public void setLogLevel(String logLevel) {
+        if (logLevel != null && logLevel.trim().length() > 0) {
+            this.logLevels = new HashMap<String, Integer>();
+            String[] levels = logLevel.split("\\|");
+            for (int i = 0; i < levels.length; i++) {
+                String s = levels[i].trim().toUpperCase();
+                try {
+                    XLog.Level.valueOf(s);
+                }
+                catch (Exception ex) {
+                    continue;
+                }
+                this.logLevels.put(levels[i].toUpperCase(), 1);
+            }
+        }
+    }
+
+    public void setParameter(String filterParam, String value) {
+        if (filterParams.containsKey(filterParam)) {
+            noFilter = false;
+            filterParams.put(filterParam, value);
+        }
+    }
+
+    public static void defineParameter(String filterParam) {
+        parameters.add(filterParam);
+    }
+
+    public boolean isFilterPresent() {
+        if (noFilter && logLevels == null) {
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * Checks if the logLevel and logMessage goes through the logFilter.
+     *
+     * @param logParts
+     * @return
+     */
+    public boolean matches(ArrayList<String> logParts) {
+        String logLevel = logParts.get(1);
+        String logMessage = logParts.get(2);
+        if (this.logLevels == null || this.logLevels.containsKey(logLevel.toUpperCase())) {
+            Matcher logMatcher = filterPattern.matcher(logMessage);
+            return logMatcher.matches();
+        }
+        else {
+            return false;
+        }
+    }
+
+    /**
+     * Splits the log line into timestamp, logLevel and remaining log message. Returns array containing timestamp,
+     * logLevel, and logMessage if the pattern matches i.e A new log statement, else returns null.
+     *
+     * @param logLine
+     * @return Array containing log level and log message
+     */
+    public ArrayList<String> splitLogMessage(String logLine) {
+        Matcher splitter = SPLITTER_PATTERN.matcher(logLine);
+        if (splitter.matches()) {
+            ArrayList<String> logParts = new ArrayList<String>();
+            logParts.add(splitter.group(1));// timestamp
+            logParts.add(splitter.group(2));// log level
+            logParts.add(splitter.group(3));// Log Message
+            return logParts;
+        }
+        else {
+            return null;
+        }
+    }
+
+    /**
+     * Constructs the regular expression according to the filter and assigns it to fileterPattarn. ".*" will be assigned
+     * if no filters are set.
+     */
+    public void constructPattern() {
+        if (noFilter && logLevels == null) {
+            filterPattern = Pattern.compile(ALLOW_ALL_REGEX);
+            return;
+        }
+        StringBuilder sb = new StringBuilder();
+        if (noFilter) {
+            sb.append("(.*)");
+        }
+        else {
+            sb.append("(.* ");
+            for (int i = 0; i < parameters.size(); i++) {
+                sb.append(parameters.get(i) + "\\[");
+                sb.append(filterParams.get(parameters.get(i)) + "\\] ");
+            }
+            sb.append(".*)");
+        }
+        if (!StringUtils.isEmpty(userLogFilter.getSearchText())) {
+            sb.append(userLogFilter.getSearchText() + ".*");
+        }
+        filterPattern = Pattern.compile(sb.toString());
+    }
+
+    public static void reset() {
+        parameters.clear();
+    }
+
+    @VisibleForTesting
+    public final Map<String, String> getFilterParams() {
+        return filterParams;
+    }
+
+    public XLogUserFilterParam getUserLogFilter() {
+        return userLogFilter;
+    }
+
+    public void setUserLogFilter(XLogUserFilterParam userLogFilter) {
+        this.userLogFilter = userLogFilter;
+        setLogLevel(userLogFilter.getLogLevel());
+    }
+
+    public Date getEndDate() {
+        return endDate;
+    }
+
+    public String getFormattedEndDate() {
+        return formattedEndDate;
+    }
+
+
+    public Date getStartDate() {
+        return startDate;
+    }
+
+    public boolean isDebugMode() {
+        return userLogFilter.isDebug();
+    }
+
+    public int getLogLimit() {
+        return userLogFilter.getLimit();
+    }
+
+    public String getDebugMessage() {
+        return "Log start time = " + getStartDate() + ". Log end time = " + getEndDate() + ". User Log Filter = "
+                + getUserLogFilter() + System.getProperty("line.separator");
+    }
+
+    public boolean isActionList() {
+        return isActionList;
+    }
+
+    public void setActionList(boolean isActionList) {
+        this.isActionList = isActionList;
+    }
+
+    private void calculateScanDate(Date jobStartTime, Date jobEndTime) throws IOException {
+
+        if (userLogFilter.getStartDate() != null) {
+            startDate = userLogFilter.getStartDate();
+        }
+        else if (userLogFilter.getStartOffset() != -1) {
+            startDate = adjustOffset(jobStartTime, userLogFilter.getStartOffset());
+        }
+        else {
+            startDate = jobStartTime;
+        }
+
+        if (userLogFilter.getEndDate() != null) {
+            endDate = userLogFilter.getEndDate();
+        }
+        else if (userLogFilter.getEndOffset() != -1) {
+            // If user has specified startdate as absolute then end offset will be on user start date,
+            // else end offset will be calculated on job startdate.
+            if (userLogFilter.getStartDate() != null) {
+                endDate = adjustOffset(startDate, userLogFilter.getEndOffset());
+            }
+            else {
+                endDate = adjustOffset(jobStartTime, userLogFilter.getEndOffset());
+            }
+        }
+        else {
+            endDate = jobEndTime;
+        }
+        // if recent offset is specified then start time = endtime - offset
+        if (getUserLogFilter().getRecent() != -1) {
+            startDate = adjustOffset(endDate, userLogFilter.getRecent() * -1);
+        }
+
+        //add buffer iff dates are not asbsolute
+        if (userLogFilter.getStartDate() == null) {
+            startDate = adjustOffset(startDate, -LOG_TIME_BUFFER);
+        }
+        if (userLogFilter.getEndDate() == null) {
+            endDate = adjustOffset(endDate, LOG_TIME_BUFFER);
+        }
+
+        formattedEndDate = XLogUserFilterParam.dt.get().format(getEndDate());
+    }
+
+    /**
+     * Calculate and validate date range.
+     *
+     * @param jobStartTime the job start time
+     * @param jobEndTime the job end time
+     * @throws IOException Signals that an I/O exception has occurred.
+     */
+    public void calculateAndValidateDateRange(Date jobStartTime, Date jobEndTime) throws IOException {
+        // for testcase, otherwise jobStartTime and jobEndTime will be always set
+        if (jobStartTime == null || jobEndTime == null) {
+            return;
+        }
+        calculateScanDate(jobStartTime, jobEndTime);
+
+        if (startDate.after(endDate)) {
+            throw new IOException("Start time should be less than end time. startTime = " + startDate + " endtime = "
+                    + endDate);
+        }
+        long diffHours = (endDate.getTime() - startDate.getTime()) / (60 * 60 * 1000);
+        if (isActionList) {
+            int actionLogDuration = Services.get().getConf().getInt(MAX_ACTIONLIST_SCAN_DURATION, -1);
+            if (actionLogDuration == -1) {
+                return;
+            }
+            if (diffHours > actionLogDuration) {
+                throw new IOException(
+                        "Request log streaming time range with action list is higher than configured. Please reduce the scan "
+                                + "time range. Input range (hours) = " + diffHours
+                                + " system allowed (hours) with action list = " + actionLogDuration);
+            }
+        }
+        else {
+            int logDuration = Services.get().getConf().getInt(MAX_SCAN_DURATION, -1);
+            if (logDuration == -1) {
+                return;
+            }
+            if (diffHours > logDuration) {
+                throw new IOException(
+                        "Request log streaming time range is higher than configured. Please reduce the scan time range. For coord"
+                                + " jobs you can provide action list to reduce log scan time range. Input range (hours) = "
+                                + diffHours + " system allowed (hours) = " + logDuration);
+            }
+        }
+    }
+
+    /**
+     * Adjust offset, offset will always be in min.
+     *
+     * @param date the date
+     * @param offset the offset
+     * @return the date
+     * @throws IOException Signals that an I/O exception has occurred.
+     */
+    public Date adjustOffset(Date date, int offset) throws IOException {
+        return org.apache.commons.lang.time.DateUtils.addMinutes(date, offset);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/main/java/org/apache/oozie/util/XLogStreamer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/XLogStreamer.java b/core/src/main/java/org/apache/oozie/util/XLogStreamer.java
index bc6b009..713219b 100644
--- a/core/src/main/java/org/apache/oozie/util/XLogStreamer.java
+++ b/core/src/main/java/org/apache/oozie/util/XLogStreamer.java
@@ -24,14 +24,8 @@ import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Collections;
 import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-
-import com.google.common.annotations.VisibleForTesting;
-
 import java.io.BufferedReader;
 
 /**
@@ -39,154 +33,12 @@ import java.io.BufferedReader;
  */
 public class XLogStreamer {
     private static XLog LOG = XLog.getLog(XLogStreamer.class);
-
-    /**
-     * Filter that will construct the regular expression that will be used to filter the log statement. And also checks
-     * if the given log message go through the filter. Filters that can be used are logLevel(Multi values separated by
-     * "|") jobId appName actionId token
-     */
-    public static class Filter {
-        private Map<String, Integer> logLevels;
-        private final Map<String, String> filterParams;
-        private static List<String> parameters = new ArrayList<String>();
-        private boolean noFilter;
-        private Pattern filterPattern;
-
-        // TODO Patterns to be read from config file
-        private static final String DEFAULT_REGEX = "[^\\]]*";
-
-        public static final String ALLOW_ALL_REGEX = "(.*)";
-        private static final String TIMESTAMP_REGEX = "(\\d\\d\\d\\d-\\d\\d-\\d\\d \\d\\d:\\d\\d:\\d\\d,\\d\\d\\d)";
-        private static final String WHITE_SPACE_REGEX = "\\s+";
-        private static final String LOG_LEVEL_REGEX = "(\\w+)";
-        private static final String PREFIX_REGEX = TIMESTAMP_REGEX + WHITE_SPACE_REGEX + LOG_LEVEL_REGEX
-                + WHITE_SPACE_REGEX;
-        private static final Pattern SPLITTER_PATTERN = Pattern.compile(PREFIX_REGEX + ALLOW_ALL_REGEX);
-
-        public Filter() {
-            filterParams = new HashMap<String, String>();
-            for (int i = 0; i < parameters.size(); i++) {
-                filterParams.put(parameters.get(i), DEFAULT_REGEX);
-            }
-            logLevels = null;
-            noFilter = true;
-            filterPattern = null;
-        }
-
-        public void setLogLevel(String logLevel) {
-            if (logLevel != null && logLevel.trim().length() > 0) {
-                this.logLevels = new HashMap<String, Integer>();
-                String[] levels = logLevel.split("\\|");
-                for (int i = 0; i < levels.length; i++) {
-                    String s = levels[i].trim().toUpperCase();
-                    try {
-                        XLog.Level.valueOf(s);
-                    }
-                    catch (Exception ex) {
-                        continue;
-                    }
-                    this.logLevels.put(levels[i].toUpperCase(), 1);
-                }
-            }
-        }
-
-        public void setParameter(String filterParam, String value) {
-            if (filterParams.containsKey(filterParam)) {
-                noFilter = false;
-                filterParams.put(filterParam, value);
-            }
-        }
-
-        public static void defineParameter(String filterParam) {
-            parameters.add(filterParam);
-        }
-
-        public boolean isFilterPresent() {
-            if (noFilter && logLevels == null) {
-                return false;
-            }
-            return true;
-        }
-
-        /**
-         * Checks if the logLevel and logMessage goes through the logFilter.
-         *
-         * @param logParts
-         * @return
-         */
-        public boolean matches(ArrayList<String> logParts) {
-            String logLevel = logParts.get(1);
-            String logMessage = logParts.get(2);
-            if (this.logLevels == null || this.logLevels.containsKey(logLevel.toUpperCase())) {
-                Matcher logMatcher = filterPattern.matcher(logMessage);
-                return logMatcher.matches();
-            }
-            else {
-                return false;
-            }
-        }
-
-        /**
-         * Splits the log line into timestamp, logLevel and remaining log message. Returns array containing timestamp, logLevel, and
-         * logMessage if the pattern matches i.e A new log statement, else returns null.
-         *
-         * @param logLine
-         * @return Array containing log level and log message
-         */
-        public ArrayList<String> splitLogMessage(String logLine) {
-            Matcher splitter = SPLITTER_PATTERN.matcher(logLine);
-            if (splitter.matches()) {
-                ArrayList<String> logParts = new ArrayList<String>();
-                logParts.add(splitter.group(1));// timestamp
-                logParts.add(splitter.group(2));// log level
-                logParts.add(splitter.group(3));// Log Message
-                return logParts;
-            }
-            else {
-                return null;
-            }
-        }
-
-        /**
-         * Constructs the regular expression according to the filter and assigns it to fileterPattarn. ".*" will be
-         * assigned if no filters are set.
-         */
-        public void constructPattern() {
-            if (noFilter && logLevels == null) {
-                filterPattern = Pattern.compile(ALLOW_ALL_REGEX);
-                return;
-            }
-            StringBuilder sb = new StringBuilder();
-            if (noFilter) {
-                sb.append("(.*)");
-            }
-            else {
-                sb.append("(.* ");
-                for (int i = 0; i < parameters.size(); i++) {
-                    sb.append(parameters.get(i) + "\\[");
-                    sb.append(filterParams.get(parameters.get(i)) + "\\] ");
-                }
-                sb.append(".*)");
-            }
-            filterPattern = Pattern.compile(sb.toString());
-        }
-
-        public static void reset() {
-            parameters.clear();
-        }
-
-        @VisibleForTesting
-        public final Map<String, String> getFilterParams() {
-          return filterParams;
-        }
-    }
-
     private String logFile;
     private String logPath;
-    private Filter logFilter;
+    private XLogFilter logFilter;
     private long logRotation;
 
-    public XLogStreamer(Filter logFilter, String logPath, String logFile, long logRotationSecs) {
+    public XLogStreamer(XLogFilter logFilter, String logPath, String logFile, long logRotationSecs) {
         this.logFilter = logFilter;
         if (logFile == null) {
             logFile = "oozie-app.log";
@@ -207,8 +59,11 @@ public class XLogStreamer {
      */
     public void streamLog(Writer writer, Date startTime, Date endTime, int bufferLen) throws IOException {
         // Get a Reader for the log file(s)
-        BufferedReader reader = makeReader(startTime, endTime);
+        BufferedReader reader = new BufferedReader(getReader(startTime, endTime));
         try {
+            if(logFilter.isDebugMode()){
+                writer.write(logFilter.getDebugMessage());
+            }
             // Process the entire logs from the reader using the logFilter
             new TimestampedMessageParser(reader, logFilter).processRemaining(writer, bufferLen);
         }
@@ -225,7 +80,25 @@ public class XLogStreamer {
      * @return A BufferedReader for the log files
      * @throws IOException
      */
+
+    private MultiFileReader getReader(Date startTime, Date endTime) throws IOException {
+        logFilter.calculateAndValidateDateRange(startTime, endTime);
+        return new MultiFileReader(getFileList(logFilter.getStartDate(), logFilter.getEndDate()));
+    }
+
     public BufferedReader makeReader(Date startTime, Date endTime) throws IOException {
+        return new BufferedReader(getReader(startTime,endTime));
+    }
+
+    /**
+     * Gets the log file list for specific date range.
+     *
+     * @param startTime the start time
+     * @param endTime the end time
+     * @return log file list
+     * @throws IOException Signals that an I/O exception has occurred.
+     */
+    private ArrayList<File> getFileList(Date startTime, Date endTime) throws IOException {
         long startTimeMillis = 0;
         long endTimeMillis;
         if (startTime != null) {
@@ -238,10 +111,7 @@ public class XLogStreamer {
             endTimeMillis = endTime.getTime();
         }
         File dir = new File(logPath);
-        ArrayList<File> files = getFileList(dir, startTimeMillis, endTimeMillis, logRotation, logFile);
-        // The MultiFileReader is a Reader that treats the files as one source so we can easily go through them all with one
-        // BufferedReader
-        return new BufferedReader(new MultiFileReader(files));
+        return getFileList(dir, startTimeMillis, endTimeMillis, logRotation, logFile);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/main/java/org/apache/oozie/util/XLogUserFilterParam.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/XLogUserFilterParam.java b/core/src/main/java/org/apache/oozie/util/XLogUserFilterParam.java
new file mode 100644
index 0000000..6e49e11
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/util/XLogUserFilterParam.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.client.rest.RestConstants;
+import org.apache.oozie.command.CommandException;
+
+public class XLogUserFilterParam {
+
+    public static final String START_TIME = "START";
+
+    public static final String END_TIME = "END";
+
+    public static final String SEARCH_TEXT = "TEXT";
+
+    public static final String LOG_LEVEL = "LOGLEVEL";
+
+    public static final String LIMIT = "LIMIT";
+
+    public static final String RECENT_LOG_OFFSET = "RECENT";
+
+    public static final String DEBUG = "DEBUG";
+
+    private Date startTime;
+    private Date endTime;
+    private int startOffset;
+    private int endOffset = -1;
+    private int recent = -1;
+    private String logLevel;
+    private int limit = -1;
+    private boolean isDebug = false;
+    private String searchText;
+
+    public static final ThreadLocal<SimpleDateFormat> dt = new ThreadLocal<SimpleDateFormat>() {
+        @Override
+        protected SimpleDateFormat initialValue() {
+            return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        }
+    };
+
+    static final HashSet<String> LOG_LEVELS = new HashSet<String>();
+    static {
+        LOG_LEVELS.add("ALL");
+        LOG_LEVELS.add("DEBUG");
+        LOG_LEVELS.add("ERROR");
+        LOG_LEVELS.add("INFO");
+        LOG_LEVELS.add("TRACE");
+        LOG_LEVELS.add("WARN");
+        LOG_LEVELS.add("FATAL");
+    }
+
+    public XLogUserFilterParam() {
+    }
+
+    /**
+     * Instantiates a new log user param.
+     *
+     * @param params the params
+     * @throws CommandException the command exception
+     */
+    public XLogUserFilterParam(Map<String, String[]> params) throws CommandException {
+        if (params != null && params.get(RestConstants.LOG_FILTER_OPTION) != null
+                && params.get(RestConstants.LOG_FILTER_OPTION).length > 0) {
+            try {
+                parseFilterParam(params.get(RestConstants.LOG_FILTER_OPTION)[0]);
+            }
+            catch (Exception e) {
+               throw new CommandException(ErrorCode.E0302, e.getMessage());
+
+            }
+        }
+    }
+
+    /**
+     * Parse filter param
+     *
+     * @param param the param
+     * @param map the map
+     * @throws Exception
+     */
+    private void parseFilterParam(String param) throws Exception {
+        if (StringUtils.isEmpty(param)) {
+            return;
+        }
+        for (String keyValue : param.split(";")) {
+            String[] pairs = keyValue.split("=");
+            String key = pairs[0].toUpperCase();
+            String value = pairs.length == 1 ? "" : pairs[1];
+            if (key.equals(START_TIME)) {
+                startTime = getDate(value);
+                if (startTime == null) {
+                    startOffset = getOffsetInMinute(value);
+                }
+            }
+            else if (key.equals(END_TIME)) {
+                endTime = getDate(value);
+                if (endTime == null) {
+                    endOffset = getOffsetInMinute(value);
+                }
+
+            }
+            else if (key.equals(RECENT_LOG_OFFSET)) {
+                recent = getOffsetInMinute(value);
+            }
+            else if (key.equals(LIMIT)) {
+                limit = Integer.parseInt(value);
+
+            }
+            else if (key.equals(LOG_LEVEL)) {
+                logLevel = value;
+                validateLogLevel(logLevel);
+
+            }
+            else if (key.equals(DEBUG)) {
+                isDebug = true;
+
+            }
+            else if (key.equals(SEARCH_TEXT)) {
+                searchText = value;
+
+            }
+            else {
+                throw new Exception("Unsupported log filter " + key);
+            }
+        }
+    }
+
+    /**
+     * Gets the log level.
+     *
+     * @return the log level
+     */
+    public String getLogLevel() {
+        return logLevel;
+
+    }
+
+    /**
+     * Gets the start date.
+     *
+     * @return the start date
+     */
+    public Date getStartDate() {
+        return startTime;
+    }
+
+    /**
+     * Gets the end date.
+     *
+     * @return the end date
+     */
+    public Date getEndDate() {
+        return endTime;
+    }
+
+    /**
+     * Gets the search text.
+     *
+     * @return the search text
+     */
+    public String getSearchText() {
+        return searchText;
+    }
+
+    /**
+     * Validate log level.
+     *
+     * @throws CommandException
+     */
+    public void validateLogLevel(String loglevel) throws CommandException {
+        if (StringUtils.isEmpty(loglevel)) {
+            return;
+        }
+        for (String level : getLogLevel().split("\\|")) {
+            if (!LOG_LEVELS.contains(level)) {
+                throw new CommandException(ErrorCode.E0302, "Supported log level are " + LOG_LEVELS.toString());
+            }
+        }
+    }
+
+    /**
+     * Validate search text.
+     *
+     * @throws CommandException the command exception
+     */
+    public void validateSearchText() throws CommandException {
+        // No restriction on search text.
+
+    }
+
+    /**
+     * Gets the date. Date can in TZ or yyyy-MM-dd HH:mm:ss,SSS format
+     *
+     * @param String date
+     * @return the date
+     */
+    public Date getDate(String date) {
+        try {
+            return DateUtils.parseDateOozieTZ(date);
+        }
+        catch (ParseException e) {
+            try {
+                return dt.get().parse(date);
+            }
+            catch (ParseException e1) {
+                return null;
+            }
+        }
+    }
+
+    /**
+     * Checks if is debug.
+     *
+     * @return true, if it's debug
+     */
+    public boolean isDebug() {
+        return isDebug;
+    }
+
+    public Date getEndTime() {
+        return endTime;
+    }
+
+    public int getEndOffset() {
+        return endOffset;
+    }
+
+    public int getRecent() {
+        return recent;
+    }
+
+    public int getLimit() {
+        return limit;
+    }
+
+    public int getStartOffset() {
+        return startOffset;
+    }
+
+    @Override
+    public String toString() {
+        StringBuffer sb = new StringBuffer();
+        sb.append(START_TIME).append("=").append(getStartDate()).append(";");
+        sb.append(END_TIME).append("=").append(getEndDate()).append(";");
+        sb.append(LOG_LEVEL).append("=").append(getLogLevel()).append(";");
+        sb.append(LIMIT).append("=").append(getLimit()).append(";");
+        sb.append(RECENT_LOG_OFFSET).append("=").append(getRecent()).append(";");
+        return sb.toString();
+    }
+
+    private int getOffsetInMinute(String offset) throws IOException {
+
+        if (Character.isLetter(offset.charAt(offset.length() - 1))) {
+            switch (offset.charAt(offset.length() - 1)) {
+                case 'H':
+                case 'h':
+                    return Integer.parseInt(offset.substring(0, offset.length() - 1)) * 60;
+                case 'M':
+                case 'm':
+                    return Integer.parseInt(offset.substring(0, offset.length() - 1));
+                default:
+                    throw new IOException("Unsupported offset " + offset);
+            }
+        }
+        else {
+            if (StringUtils.isNumeric(offset)) {
+                return Integer.parseInt(offset) * 60;
+            }
+            else {
+                throw new IOException("Unsupported time : " + offset);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index 70620f6..7ea0d7b 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -2052,4 +2052,25 @@
         wf.name=,action.name=,action.type=,launcher=true"
         </description>
     </property>
+
+    <property>
+        <name>oozie.service.XLogStreamingService.max.log.scan.duration</name>
+        <value>-1</value>
+        <description>
+        Max log scan duration in hours. If log scan request end_date - start_date > value,
+        then exception is thrown to reduce the scan duration. -1 indicate no limit.
+        </description>
+    </property>
+
+    <property>
+        <name>oozie.service.XLogStreamingService.max.actionlist.log.scan.duration</name>
+        <value>-1</value>
+        <description>
+        Max log scan duration in hours for coordinator job when list of actions are specified.
+        If log streaming request end_date - start_date > value, then exception is thrown to reduce the scan duration.
+        -1 indicate no limit.
+        This setting is separate from max.log.scan.duration as we want to allow higher durations when actions are specified.
+        </description>
+    </property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java b/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java
index b4f161a..d711dd9 100644
--- a/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java
+++ b/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java
@@ -43,9 +43,10 @@ import org.apache.oozie.service.Services;
 import org.apache.oozie.service.XLogStreamingService;
 import org.apache.oozie.test.XFsTestCase;
 import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.XLogFilter;
 import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.XLogUserFilterParam;
 import org.apache.oozie.util.XConfiguration;
-import org.apache.oozie.util.XLogStreamer.Filter;
 
 public class TestCoordinatorEngineStreamLog extends XFsTestCase {
     private Services services;
@@ -81,12 +82,12 @@ public class TestCoordinatorEngineStreamLog extends XFsTestCase {
     }
 
     static class DummyXLogStreamingService extends XLogStreamingService {
-        Filter filter;
+        XLogFilter filter;
         Date startTime;
         Date endTime;
 
         @Override
-        public void streamLog(Filter filter1, Date startTime, Date endTime, Writer writer, Map<String, String[]> params)
+        public void streamLog(XLogFilter filter1, Date startTime, Date endTime, Writer writer, Map<String, String[]> params)
                 throws IOException {
             filter = filter1;
             this.startTime = startTime;
@@ -120,7 +121,7 @@ public class TestCoordinatorEngineStreamLog extends XFsTestCase {
         // Test 1.to test if fields are injected
         ce.streamLog(jobId, new StringWriter(), new HashMap<String, String[]>());
         DummyXLogStreamingService service = (DummyXLogStreamingService) services.get(XLogStreamingService.class);
-        Filter filter = service.filter;
+        XLogFilter filter = service.filter;
         assertEquals(filter.getFilterParams().get(DagXLogInfoService.JOB), jobId);
         assertTrue(endDate.before(service.endTime));
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/test/java/org/apache/oozie/service/TestXLogStreamingService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestXLogStreamingService.java b/core/src/test/java/org/apache/oozie/service/TestXLogStreamingService.java
index c03385d..145ddbe 100644
--- a/core/src/test/java/org/apache/oozie/service/TestXLogStreamingService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestXLogStreamingService.java
@@ -19,7 +19,9 @@ package org.apache.oozie.service;
 
 import org.apache.commons.logging.LogFactory;
 import org.apache.oozie.test.XTestCase;
+import org.apache.oozie.util.XLogFilter;
 import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.XLogUserFilterParam;
 
 import java.io.File;
 import java.io.FileOutputStream;
@@ -27,7 +29,6 @@ import java.io.InputStream;
 import java.io.StringWriter;
 import java.util.HashMap;
 import java.util.Properties;
-import org.apache.oozie.util.XLogStreamer;
 
 public class TestXLogStreamingService extends XTestCase {
 
@@ -128,14 +129,15 @@ public class TestXLogStreamingService extends XTestCase {
     }
 
     public void testNoDashInConversionPattern() throws Exception{
-        XLogStreamer.Filter.reset();
-        XLogStreamer.Filter.defineParameter("USER");
-        XLogStreamer.Filter.defineParameter("GROUP");
-        XLogStreamer.Filter.defineParameter("TOKEN");
-        XLogStreamer.Filter.defineParameter("APP");
-        XLogStreamer.Filter.defineParameter("JOB");
-        XLogStreamer.Filter.defineParameter("ACTION");
-        XLogStreamer.Filter xf = new XLogStreamer.Filter();
+        XLogFilter.reset();
+        XLogFilter.defineParameter("USER");
+        XLogFilter.defineParameter("GROUP");
+        XLogFilter.defineParameter("TOKEN");
+        XLogFilter.defineParameter("APP");
+        XLogFilter.defineParameter("JOB");
+        XLogFilter.defineParameter("ACTION");
+        XLogFilter xf = new XLogFilter(new XLogUserFilterParam(null));
+
         xf.setParameter("USER", "oozie");
         xf.setLogLevel("DEBUG|INFO");
         // Previously, a dash ("-") was always required somewhere in a line in order for that line to pass the filter; this test
@@ -187,10 +189,12 @@ public class TestXLogStreamingService extends XTestCase {
     }
 
     private boolean doStreamDisabledCheck() throws Exception {
-        return doStreamLog(null).equals("Log streaming disabled!!");
+        XLogFilter xf = new XLogFilter(new XLogUserFilterParam(null));
+
+        return doStreamLog(xf).equals("Log streaming disabled!!");
     }
 
-    private String doStreamLog(XLogStreamer.Filter xf) throws Exception {
+    private String doStreamLog(XLogFilter xf) throws Exception {
         StringWriter w = new StringWriter();
         Services.get().get(XLogStreamingService.class).streamLog(xf, null, null, w, new HashMap<String, String[]>());
         return w.toString();

http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java b/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java
index 29bca41..fd48951 100644
--- a/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java
@@ -28,8 +28,8 @@ import java.util.Properties;
 import org.apache.commons.logging.LogFactory;
 import org.apache.oozie.test.EmbeddedServletContainer;
 import org.apache.oozie.test.ZKXTestCase;
+import org.apache.oozie.util.XLogFilter;
 import org.apache.oozie.util.IOUtils;
-import org.apache.oozie.util.XLogStreamer;
 import org.apache.oozie.util.ZKUtils;
 
 public class TestZKXLogStreamingService extends ZKXTestCase {
@@ -146,14 +146,14 @@ public class TestZKXLogStreamingService extends ZKXTestCase {
     }
 
     public void testNoDashInConversionPattern() throws Exception{
-        XLogStreamer.Filter.reset();
-        XLogStreamer.Filter.defineParameter("USER");
-        XLogStreamer.Filter.defineParameter("GROUP");
-        XLogStreamer.Filter.defineParameter("TOKEN");
-        XLogStreamer.Filter.defineParameter("APP");
-        XLogStreamer.Filter.defineParameter("JOB");
-        XLogStreamer.Filter.defineParameter("ACTION");
-        XLogStreamer.Filter xf = new XLogStreamer.Filter();
+        XLogFilter.reset();
+        XLogFilter.defineParameter("USER");
+        XLogFilter.defineParameter("GROUP");
+        XLogFilter.defineParameter("TOKEN");
+        XLogFilter.defineParameter("APP");
+        XLogFilter.defineParameter("JOB");
+        XLogFilter.defineParameter("ACTION");
+        XLogFilter xf = new XLogFilter();
         xf.setParameter("USER", "oozie");
         xf.setLogLevel("DEBUG|INFO");
         // Previously, a dash ("-") was always required somewhere in a line in order for that line to pass the filter; this test
@@ -188,10 +188,10 @@ public class TestZKXLogStreamingService extends ZKXTestCase {
 
     private boolean doStreamDisabledCheck() throws Exception {
         Services.get().get(XLogService.class).init(Services.get());
-        return doStreamLog(new XLogStreamer.Filter()).equals("Log streaming disabled!!");
+        return doStreamLog(new XLogFilter()).equals("Log streaming disabled!!");
     }
 
-    protected String doStreamLog(XLogStreamer.Filter xf) throws Exception {
+    protected String doStreamLog(XLogFilter xf) throws Exception {
         StringWriter w = new StringWriter();
         ZKXLogStreamingService zkxlss = new ZKXLogStreamingService();
         try {
@@ -210,14 +210,14 @@ public class TestZKXLogStreamingService extends ZKXTestCase {
     }
 
     public void testStreamingWithMultipleOozieServers() throws Exception {
-        XLogStreamer.Filter.reset();
-        XLogStreamer.Filter.defineParameter("USER");
-        XLogStreamer.Filter.defineParameter("GROUP");
-        XLogStreamer.Filter.defineParameter("TOKEN");
-        XLogStreamer.Filter.defineParameter("APP");
-        XLogStreamer.Filter.defineParameter("JOB");
-        XLogStreamer.Filter.defineParameter("ACTION");
-        XLogStreamer.Filter xf = new XLogStreamer.Filter();
+        XLogFilter.reset();
+        XLogFilter.defineParameter("USER");
+        XLogFilter.defineParameter("GROUP");
+        XLogFilter.defineParameter("TOKEN");
+        XLogFilter.defineParameter("APP");
+        XLogFilter.defineParameter("JOB");
+        XLogFilter.defineParameter("ACTION");
+        XLogFilter xf = new XLogFilter();
         xf.setParameter("JOB", "0000003-130610102426873-oozie-rkan-W");
         xf.setLogLevel("WARN|INFO");
         File log4jFile = new File(getTestCaseConfDir(), "test-log4j.properties");

http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/test/java/org/apache/oozie/util/TestLogStreamer.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/util/TestLogStreamer.java b/core/src/test/java/org/apache/oozie/util/TestLogStreamer.java
index 659949f..bc19341 100644
--- a/core/src/test/java/org/apache/oozie/util/TestLogStreamer.java
+++ b/core/src/test/java/org/apache/oozie/util/TestLogStreamer.java
@@ -27,6 +27,10 @@ import java.util.Calendar;
 import java.util.Date;
 import java.util.zip.GZIPOutputStream;
 
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.service.ServiceException;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.XLogService;
 import org.apache.oozie.test.XTestCase;
 
 public class TestLogStreamer extends XTestCase {
@@ -36,16 +40,18 @@ public class TestLogStreamer extends XTestCase {
 
     private final static SimpleDateFormat filenameDateFormatter = new SimpleDateFormat("yyyy-MM-dd-HH");
 
-    public void testStreamLog() throws IOException {
+    public void testStreamLog() throws IOException, CommandException, ServiceException {
+        new Services().init();
+
         long currTime = System.currentTimeMillis();
-        XLogStreamer.Filter.reset();
-        XLogStreamer.Filter.defineParameter("USER");
-        XLogStreamer.Filter.defineParameter("GROUP");
-        XLogStreamer.Filter.defineParameter("TOKEN");
-        XLogStreamer.Filter.defineParameter("APP");
-        XLogStreamer.Filter.defineParameter("JOB");
-        XLogStreamer.Filter.defineParameter("ACTION");
-        XLogStreamer.Filter xf = new XLogStreamer.Filter();
+        XLogFilter.reset();
+        XLogFilter.defineParameter("USER");
+        XLogFilter.defineParameter("GROUP");
+        XLogFilter.defineParameter("TOKEN");
+        XLogFilter.defineParameter("APP");
+        XLogFilter.defineParameter("JOB");
+        XLogFilter.defineParameter("ACTION");
+        XLogFilter xf = new XLogFilter();
         xf.setParameter("JOB", "14-200904160239--example-forkjoinwf");
         xf.setLogLevel("DEBUG|INFO");
 
@@ -138,6 +144,9 @@ public class TestLogStreamer extends XTestCase {
         // Test for the log retrieval of the job that began 10 hours before and ended 5 hours before current time
         // respectively
         StringWriter sw = new StringWriter();
+        xf = new XLogFilter();
+        xf.setLogLevel("DEBUG|INFO");
+        xf.setParameter("JOB", "14-200904160239--example-forkjoinwf");
         XLogStreamer str = new XLogStreamer(xf, getTestCaseDir(), "oozie.log", 1);
         str.streamLog(sw, new Date(currTime - 10 * 3600000), new Date(currTime - 5 * 3600000), 4096);
         String[] out = sw.toString().split("\n");
@@ -156,6 +165,9 @@ public class TestLogStreamer extends XTestCase {
         // Test to check if the null values for startTime and endTime are translated to 0 and current time respectively
         // and corresponding log content is retrieved properly
         StringWriter sw1 = new StringWriter();
+        xf = new XLogFilter();
+        xf.setLogLevel("DEBUG|INFO");
+        xf.setParameter("JOB", "14-200904160239--example-forkjoinwf");
         XLogStreamer str1 = new XLogStreamer(xf, getTestCaseDir(), "oozie.log", 1);
         str1.streamLog(sw1, null, null, 4096);
         out = sw1.toString().split("\n");
@@ -173,20 +185,21 @@ public class TestLogStreamer extends XTestCase {
         assertEquals(true, out[7].contains("_L7_"));
     }
 
-    public void testStreamLogMultipleHours() throws IOException {
-
+    public void testStreamLogMultipleHours() throws IOException, CommandException, ServiceException {
+        new Services().init();
         long currTime = System.currentTimeMillis();
-        XLogStreamer.Filter.reset();
-        XLogStreamer.Filter.defineParameter("USER");
-        XLogStreamer.Filter.defineParameter("GROUP");
-        XLogStreamer.Filter.defineParameter("TOKEN");
-        XLogStreamer.Filter.defineParameter("APP");
-        XLogStreamer.Filter.defineParameter("JOB");
-        XLogStreamer.Filter.defineParameter("ACTION");
-        XLogStreamer.Filter xf = new XLogStreamer.Filter();
+        XLogFilter.reset();
+        XLogFilter.defineParameter("USER");
+        XLogFilter.defineParameter("GROUP");
+        XLogFilter.defineParameter("TOKEN");
+        XLogFilter.defineParameter("APP");
+        XLogFilter.defineParameter("JOB");
+        XLogFilter.defineParameter("ACTION");
+        XLogFilter xf = new XLogFilter();
         xf.setParameter("JOB", "14-200904160239--example-forkjoinwf");
         xf.setLogLevel("DEBUG|INFO");
 
+
         // Test to check if all gz log files in the range jobStartTime-currentTime are retrieved
         String outFilename = "oozie.log-2012-04-24-19.gz";
         File f = new File(getTestCaseDir() + "/" + outFilename);
@@ -220,6 +233,9 @@ public class TestLogStreamer extends XTestCase {
 
         // Test for the log retrieval of the job spanning multiple hours
         StringWriter sw2 = new StringWriter();
+        xf = new XLogFilter();
+        xf.setParameter("JOB", "14-200904160239--example-forkjoinwf");
+        xf.setLogLevel("DEBUG|INFO");
         XLogStreamer str2 = new XLogStreamer(xf, getTestCaseDir(), "oozie.log", 1);
         Calendar calendarEntry = Calendar.getInstance();
         // Setting start-time to 2012-04-24-19 for log stream (month-1 passed as parameter since 0=January), and end time is current time
@@ -239,16 +255,17 @@ public class TestLogStreamer extends XTestCase {
         assertEquals(true, out[4].contains("_L23_"));
     }
 
-    public void testStreamLogNoDash() throws IOException {
+    public void testStreamLogNoDash() throws IOException, CommandException, ServiceException {
+        new Services().init();
         long currTime = System.currentTimeMillis();
-        XLogStreamer.Filter.reset();
-        XLogStreamer.Filter.defineParameter("USER");
-        XLogStreamer.Filter.defineParameter("GROUP");
-        XLogStreamer.Filter.defineParameter("TOKEN");
-        XLogStreamer.Filter.defineParameter("APP");
-        XLogStreamer.Filter.defineParameter("JOB");
-        XLogStreamer.Filter.defineParameter("ACTION");
-        XLogStreamer.Filter xf = new XLogStreamer.Filter();
+        XLogFilter.reset();
+        XLogFilter.defineParameter("USER");
+        XLogFilter.defineParameter("GROUP");
+        XLogFilter.defineParameter("TOKEN");
+        XLogFilter.defineParameter("APP");
+        XLogFilter.defineParameter("JOB");
+        XLogFilter.defineParameter("ACTION");
+        XLogFilter xf = new XLogFilter();
         xf.setParameter("JOB", "14-200904160239--example-forkjoinwf");
         xf.setLogLevel("DEBUG|INFO");
 
@@ -272,6 +289,10 @@ public class TestLogStreamer extends XTestCase {
         f1.setLastModified(currTime);
 
         StringWriter sw = new StringWriter();
+        xf = new XLogFilter();
+        xf.setParameter("JOB", "14-200904160239--example-forkjoinwf");
+        xf.setLogLevel("DEBUG|INFO");
+
         XLogStreamer str = new XLogStreamer(xf, getTestCaseDir(), "oozie.log", 1);
         str.streamLog(sw, new Date(currTime - 5000), new Date(currTime + 5000), 4096);
         String[] out = sw.toString().split("\n");

http://git-wip-us.apache.org/repos/asf/oozie/blob/f9176a01/core/src/test/java/org/apache/oozie/util/TestSimplifiedTimestampedMessageParser.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/util/TestSimplifiedTimestampedMessageParser.java b/core/src/test/java/org/apache/oozie/util/TestSimplifiedTimestampedMessageParser.java
index 75341aa..8deac0b 100644
--- a/core/src/test/java/org/apache/oozie/util/TestSimplifiedTimestampedMessageParser.java
+++ b/core/src/test/java/org/apache/oozie/util/TestSimplifiedTimestampedMessageParser.java
@@ -28,14 +28,14 @@ import org.apache.oozie.test.XTestCase;
 public class TestSimplifiedTimestampedMessageParser extends XTestCase {
 
     public void testProcessRemainingLog() throws IOException {
-        XLogStreamer.Filter.reset();
-        XLogStreamer.Filter.defineParameter("USER");
-        XLogStreamer.Filter.defineParameter("GROUP");
-        XLogStreamer.Filter.defineParameter("TOKEN");
-        XLogStreamer.Filter.defineParameter("APP");
-        XLogStreamer.Filter.defineParameter("JOB");
-        XLogStreamer.Filter.defineParameter("ACTION");
-        XLogStreamer.Filter xf = new XLogStreamer.Filter();
+        XLogFilter.reset();
+        XLogFilter.defineParameter("USER");
+        XLogFilter.defineParameter("GROUP");
+        XLogFilter.defineParameter("TOKEN");
+        XLogFilter.defineParameter("APP");
+        XLogFilter.defineParameter("JOB");
+        XLogFilter.defineParameter("ACTION");
+        XLogFilter xf = new XLogFilter();
         xf.setParameter("JOB", "14-200904160239--example-forkjoinwf");
         xf.setLogLevel("DEBUG|WARN");
 
@@ -66,14 +66,14 @@ public class TestSimplifiedTimestampedMessageParser extends XTestCase {
     }
 
     public void testProcessRemainingCoordinatorLogForActions() throws IOException {
-        XLogStreamer.Filter.reset();
-        XLogStreamer.Filter.defineParameter("USER");
-        XLogStreamer.Filter.defineParameter("GROUP");
-        XLogStreamer.Filter.defineParameter("TOKEN");
-        XLogStreamer.Filter.defineParameter("APP");
-        XLogStreamer.Filter.defineParameter("JOB");
-        XLogStreamer.Filter.defineParameter("ACTION");
-        XLogStreamer.Filter xf = new XLogStreamer.Filter();
+        XLogFilter.reset();
+        XLogFilter.defineParameter("USER");
+        XLogFilter.defineParameter("GROUP");
+        XLogFilter.defineParameter("TOKEN");
+        XLogFilter.defineParameter("APP");
+        XLogFilter.defineParameter("JOB");
+        XLogFilter.defineParameter("ACTION");
+        XLogFilter xf = new XLogFilter();
         xf.setParameter("JOB", "14-200904160239--example-C");
         xf.setParameter("ACTION", "14-200904160239--example-C@1");