You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pu...@apache.org on 2017/04/12 21:17:43 UTC

[1/3] oozie git commit: OOZIE-2815 Oozie not always display job log

Repository: oozie
Updated Branches:
  refs/heads/master 2237bbd82 -> cf3b400a2


http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/test/java/org/apache/oozie/util/TestLogStreamer.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/util/TestLogStreamer.java b/core/src/test/java/org/apache/oozie/util/TestLogStreamer.java
index f90f784..3253512 100644
--- a/core/src/test/java/org/apache/oozie/util/TestLogStreamer.java
+++ b/core/src/test/java/org/apache/oozie/util/TestLogStreamer.java
@@ -22,16 +22,19 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.StringWriter;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Calendar;
 import java.util.Date;
+import java.util.Properties;
 import java.util.zip.GZIPOutputStream;
 
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.service.ServiceException;
 import org.apache.oozie.service.Services;
+import org.apache.oozie.service.XLogService;
 import org.apache.oozie.test.XTestCase;
 
 public class TestLogStreamer extends XTestCase {
@@ -39,9 +42,20 @@ public class TestLogStreamer extends XTestCase {
     static String logStatement = " USER[oozie] GROUP[-] TOKEN[-] APP[-] "
                 + "JOB[14-200904160239--example-forkjoinwf] ACTION[-] ";
 
+    String defaultLog4jFile;
+
     private final static SimpleDateFormat filenameDateFormatter = new SimpleDateFormat("yyyy-MM-dd-HH");
 
+    @Override
+    protected void tearDown() throws Exception {
+        if (null != defaultLog4jFile) {
+            setSystemProperty(XLogService.LOG4J_FILE, defaultLog4jFile);
+        }
+        super.tearDown();
+    }
+
     public void testStreamLog() throws IOException, CommandException, ServiceException, ParseException {
+        setupLog4j();
         new Services().init();
 
         long currTime = System.currentTimeMillis();
@@ -151,7 +165,7 @@ public class TestLogStreamer extends XTestCase {
         XLogStreamer str = new XLogStreamer(xf, getTestCaseDir(), "oozie.log", 1);
         Calendar cal = Calendar.getInstance();
         cal.set(2009, Calendar.JUNE, 24, 2, 43, 0);
-        str.streamLog(sw, cal.getTime(), new Date(currTime - 5 * 3600000), 4096);
+        str.streamLog(sw, cal.getTime(), new Date(currTime - 5 * 3600000));
         String[] out = sw.toString().split("\n");
         // Check if the retrieved log content is of length seven lines after filtering based on time window, file name
         // pattern and parameters like JobId, Username etc. and/or based on log level like INFO, DEBUG, etc.
@@ -172,7 +186,7 @@ public class TestLogStreamer extends XTestCase {
         xf.setLogLevel("DEBUG|INFO");
         xf.setParameter("JOB", "14-200904160239--example-forkjoinwf");
         XLogStreamer str1 = new XLogStreamer(xf, getTestCaseDir(), "oozie.log", 1);
-        str1.streamLog(sw1, null, null, 4096);
+        str1.streamLog(sw1, null, null);
         out = sw1.toString().split("\n");
         // Check if the retrieved log content is of length eight lines after filtering based on time window, file name
         // pattern and parameters like JobId, Username etc. and/or based on log level like INFO, DEBUG, etc.
@@ -189,6 +203,7 @@ public class TestLogStreamer extends XTestCase {
     }
 
     public void testStreamLogMultipleHours() throws IOException, CommandException, ServiceException {
+        setupLog4j();
         new Services().init();
         long currTime = System.currentTimeMillis();
         XLogFilter.reset();
@@ -243,7 +258,7 @@ public class TestLogStreamer extends XTestCase {
         Calendar calendarEntry = Calendar.getInstance();
         // Setting start-time to 2012-04-24-19 for log stream (month-1 passed as parameter since 0=January), and end time is current time
         calendarEntry.set(2012, 3, 24, 19, 0);
-        str2.streamLog(sw2, calendarEntry.getTime(), new Date(System.currentTimeMillis()), 4096);
+        str2.streamLog(sw2, calendarEntry.getTime(), new Date(System.currentTimeMillis()));
         String[] out = sw2.toString().split("\n");
 
         // Check if the retrieved log content is of length five lines after filtering based on time window, file name
@@ -259,6 +274,7 @@ public class TestLogStreamer extends XTestCase {
     }
 
     public void testStreamLogNoDash() throws IOException, CommandException, ServiceException {
+        setupLog4j();
         new Services().init();
         long currTime = System.currentTimeMillis();
         XLogFilter.reset();
@@ -297,7 +313,7 @@ public class TestLogStreamer extends XTestCase {
         xf.setLogLevel("DEBUG|INFO");
 
         XLogStreamer str = new XLogStreamer(xf, getTestCaseDir(), "oozie.log", 1);
-        str.streamLog(sw, null, null, 4096);
+        str.streamLog(sw, null, null);
         String[] out = sw.toString().split("\n");
         // Check if the retrieved log content is of length five lines after filtering; we expect the first five lines because the
         // filtering shouldn't care whether or not there is a dash while the last five lines don't pass the normal filtering
@@ -310,6 +326,16 @@ public class TestLogStreamer extends XTestCase {
         assertEquals(true, out[4].contains("_L5_"));
     }
 
+    public void testBufferLen() throws IOException, CommandException, ServiceException {
+        new Services().init();
+        XLogStreamer str = new XLogStreamer(null, getTestCaseDir(), "oozie.log", 1);
+        assertEquals(4096, str.getBufferLen());
+        str = new XLogErrorStreamer(null);
+        assertEquals(2048, str.getBufferLen());
+        str = new XLogAuditStreamer(null);
+        assertEquals(3, str.getBufferLen());
+    }
+
     static void writeToGZFile(File f, StringBuilder sbr) throws IOException {
         GZIPOutputStream gzout = new GZIPOutputStream(new FileOutputStream(f));
         String strg = sbr.toString();
@@ -318,4 +344,17 @@ public class TestLogStreamer extends XTestCase {
         gzout.write(buf, 0, buf.length);
         gzout.close();
     }
+
+    private void setupLog4j() throws IOException {
+        defaultLog4jFile = System.getProperty(XLogService.LOG4J_FILE);
+        File log4jFile = new File(getTestCaseConfDir(), "test-log4j.properties");
+        ClassLoader cl = Thread.currentThread().getContextClassLoader();
+        InputStream is = cl.getResourceAsStream("test-no-dash-log4j.properties");
+        Properties log4jProps = new Properties();
+        log4jProps.load(is);
+        // prevent conflicts with other tests by changing the log file location
+        log4jProps.setProperty("log4j.appender.oozie.File", getTestCaseDir() + "/oozie.log");
+        log4jProps.store(new FileOutputStream(log4jFile), "");
+        setSystemProperty(XLogService.LOG4J_FILE, log4jFile.getName());
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/test/java/org/apache/oozie/util/TestSimplifiedTimestampedMessageParser.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/util/TestSimplifiedTimestampedMessageParser.java b/core/src/test/java/org/apache/oozie/util/TestSimplifiedTimestampedMessageParser.java
index ea899fa..3b2e4b9 100644
--- a/core/src/test/java/org/apache/oozie/util/TestSimplifiedTimestampedMessageParser.java
+++ b/core/src/test/java/org/apache/oozie/util/TestSimplifiedTimestampedMessageParser.java
@@ -24,11 +24,15 @@ import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
 import java.io.StringWriter;
+
+import org.apache.oozie.service.ServiceException;
+import org.apache.oozie.service.Services;
 import org.apache.oozie.test.XTestCase;
 
 public class TestSimplifiedTimestampedMessageParser extends XTestCase {
 
-    public void testProcessRemainingLog() throws IOException {
+    public void testProcessRemainingLog() throws IOException, ServiceException {
+        new Services().init();
         XLogFilter.reset();
         XLogFilter.defineParameter("USER");
         XLogFilter.defineParameter("GROUP");
@@ -42,7 +46,8 @@ public class TestSimplifiedTimestampedMessageParser extends XTestCase {
 
         File file = TestTimestampedMessageParser.prepareFile1(getTestCaseDir());
         StringWriter sw = new StringWriter();
-        new SimpleTimestampedMessageParser(new BufferedReader(new FileReader(file)), xf).processRemaining(sw, 4096);
+        new SimpleTimestampedMessageParser(new BufferedReader(new FileReader(file)), xf).processRemaining(sw,
+                new XLogStreamer(xf));
         String[] out = sw.toString().split("\n");
         assertEquals(19, out.length);
         assertTrue(out[0].contains("_L1_"));
@@ -66,7 +71,8 @@ public class TestSimplifiedTimestampedMessageParser extends XTestCase {
         assertTrue(out[18].contains("_L17_"));
     }
 
-    public void testProcessRemainingCoordinatorLogForActions() throws IOException {
+    public void testProcessRemainingCoordinatorLogForActions() throws IOException, ServiceException {
+        new Services().init();
         XLogFilter.reset();
         XLogFilter.defineParameter("USER");
         XLogFilter.defineParameter("GROUP");
@@ -80,7 +86,8 @@ public class TestSimplifiedTimestampedMessageParser extends XTestCase {
 
         File file = TestTimestampedMessageParser.prepareFile2(getTestCaseDir());
         StringWriter sw = new StringWriter();
-        new SimpleTimestampedMessageParser(new BufferedReader(new FileReader(file)), xf).processRemaining(sw, 4096);
+        new SimpleTimestampedMessageParser(new BufferedReader(new FileReader(file)), xf).processRemaining(sw,
+                new XLogStreamer(xf));
         String[] matches = sw.toString().split("\n");
         assertEquals(9, matches.length);
         assertTrue(matches[0].contains("_L1_"));

http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/test/java/org/apache/oozie/util/TestTimestampedMessageParser.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/util/TestTimestampedMessageParser.java b/core/src/test/java/org/apache/oozie/util/TestTimestampedMessageParser.java
index 9e28cbc..859277a 100644
--- a/core/src/test/java/org/apache/oozie/util/TestTimestampedMessageParser.java
+++ b/core/src/test/java/org/apache/oozie/util/TestTimestampedMessageParser.java
@@ -131,7 +131,8 @@ public class TestTimestampedMessageParser extends XTestCase {
         try {
             File file = prepareFile3(getTestCaseDir());
             StringWriter sw = new StringWriter();
-            new TimestampedMessageParser(new BufferedReader(new FileReader(file)), xf).processRemaining(sw, 4096);
+            new TimestampedMessageParser(new BufferedReader(new FileReader(file)), xf).processRemaining(sw,
+                    new XLogStreamer(xf));
             assertTrue(sw.toString().isEmpty());
         }
         catch (Exception e) {
@@ -153,7 +154,8 @@ public class TestTimestampedMessageParser extends XTestCase {
         xf.setLogLevel("DEBUG|WARN");
         File file = prepareFile1(getTestCaseDir());
         StringWriter sw = new StringWriter();
-        new TimestampedMessageParser(new BufferedReader(new FileReader(file)), xf).processRemaining(sw, 4096);
+        new TimestampedMessageParser(new BufferedReader(new FileReader(file)), xf).processRemaining(sw,
+                new XLogStreamer(xf));
         String[] out = sw.toString().split("\n");
         assertEquals(14, out.length);
         assertTrue(out[0].contains("_L1_"));
@@ -186,7 +188,8 @@ public class TestTimestampedMessageParser extends XTestCase {
 
         File file = prepareFile2(getTestCaseDir());
         StringWriter sw = new StringWriter();
-        new TimestampedMessageParser(new BufferedReader(new FileReader(file)), xf).processRemaining(sw, 4096);
+        new TimestampedMessageParser(new BufferedReader(new FileReader(file)), xf).processRemaining(sw,
+                new XLogStreamer(xf));
         String[] matches = sw.toString().split("\n");
         assertEquals(2, matches.length);
         assertTrue(matches[0].contains("_L1_"));

http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/test/java/org/apache/oozie/util/TestXLogUserFilterParam.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/util/TestXLogUserFilterParam.java b/core/src/test/java/org/apache/oozie/util/TestXLogUserFilterParam.java
index 46f273f..4ce4651 100644
--- a/core/src/test/java/org/apache/oozie/util/TestXLogUserFilterParam.java
+++ b/core/src/test/java/org/apache/oozie/util/TestXLogUserFilterParam.java
@@ -180,38 +180,6 @@ public class TestXLogUserFilterParam extends XTestCase {
         assertTrue(lines[0].contains("E0803: IO error, Variable substitution depth too large: 20 ${dniInputDir}"));
     }
 
-    // Test logduration, out of range
-    public void testException() throws Exception {
-        File log4jFile = new File(getTestCaseConfDir(), "test-log4j.properties");
-        ClassLoader cl = Thread.currentThread().getContextClassLoader();
-        InputStream is = cl.getResourceAsStream("test-no-dash-log4j.properties");
-
-        Properties log4jProps = new Properties();
-        log4jProps.load(is);
-        // prevent conflicts with other tests by changing the log file location
-        log4jProps.setProperty("log4j.appender.oozie.File", getTestCaseDir() + "/oozie.log");
-        log4jProps.store(new FileOutputStream(log4jFile), "");
-        setSystemProperty(XLogService.LOG4J_FILE, log4jFile.getName());
-
-        new Services().init();
-        Services.get().getConf().setInt(XLogFilter.MAX_SCAN_DURATION, 10);
-        Map<String, String[]> paramMap = new HashMap<String, String[]>();
-        paramMap.put(RestConstants.LOG_FILTER_OPTION, new String[] {});
-        XLogFilter filter = new XLogFilter(new XLogUserFilterParam(paramMap));
-        Date startDate = new Date();
-        Date endDate = new Date(startDate.getTime() + 60 * 60 * 1000 * 11);
-
-        try {
-            doStreamLog(filter, startDate, endDate);
-            fail("should not come here");
-        }
-        catch (Exception e) {
-            e.printStackTrace();
-            assertTrue(e.getMessage().contains(
-                    "Request log streaming time range is higher than configured."));
-        }
-    }
-
     // Test log duration - in range
     public void testNoException_Withrecent() throws Exception {
         File log4jFile = new File(getTestCaseConfDir(), "test-log4j.properties");
@@ -397,7 +365,7 @@ public class TestXLogUserFilterParam extends XTestCase {
     private String doStreamLog(XLogFilter xf, Date startDate, Date endDate) throws Exception {
         StringWriter w = new StringWriter();
         Services.get().get(XLogStreamingService.class)
-                .streamLog(xf, startDate, endDate, w, new HashMap<String, String[]>());
+                .streamLog(new XLogStreamer(xf, null), startDate, endDate, w);
         return w.toString();
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index ffa5c53..e141d0e 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.4.0 release (trunk - unreleased)
 
+OOZIE-2815 Oozie not always display job log (puru)
 OOZIE-2860 Improve Jetty logging (andras.piros via pbacsko)
 OOZIE-2457 Oozie log parsing regex consume more than 90% cpu (satishsaley)
 OOZIE-2844 Increase stability of Oozie actions when log4j.properties is missing or not readable (andras.piros via pbacsko)

http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/webapp/src/main/webapp/oozie-console.js
----------------------------------------------------------------------
diff --git a/webapp/src/main/webapp/oozie-console.js b/webapp/src/main/webapp/oozie-console.js
index 76864a9..72c8a19 100644
--- a/webapp/src/main/webapp/oozie-console.js
+++ b/webapp/src/main/webapp/oozie-console.js
@@ -48,7 +48,7 @@ function getLogs(url, searchFilter, logStatus, textArea, shouldParseResponse, er
     if (searchFilter) {
         url = url + "&logfilter=" + searchFilter;
     }
-    logStatus.getEl().dom.innerText = "Log status : Loading... done";
+    logStatus.getEl().dom.innerText = "Log status : Loading...";
 
     if (!errorMsg) {
         errorMsg = "Fatal Error. Can't load logs.";
@@ -96,7 +96,7 @@ function getLogs(url, searchFilter, logStatus, textArea, shouldParseResponse, er
                     logStatus.getEl().dom.innerText = "Log status : Errored out";
 
                 }
-                if (xhr.readyState =4  && xhr.status == 200) {
+                if (xhr.readyState == 4  && xhr.status == 200) {
                     logStatus.getEl().dom.innerText = "Log status : Loading... done";
                 }
             } catch (e) {


[2/3] oozie git commit: OOZIE-2815 Oozie not always display job log

Posted by pu...@apache.org.
OOZIE-2815 Oozie not always display job log


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/993f06df
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/993f06df
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/993f06df

Branch: refs/heads/master
Commit: 993f06df44b05ece89afd85cd662bb325ed5f8a3
Parents: 2237bbd
Author: puru <pu...@gmail.com>
Authored: Wed Apr 12 14:15:36 2017 -0700
Committer: puru <pu...@gmail.com>
Committed: Wed Apr 12 14:15:36 2017 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/oozie/BaseEngine.java  |  85 +++++++-----
 .../java/org/apache/oozie/BundleEngine.java     |  50 ++-----
 .../org/apache/oozie/CoordinatorEngine.java     |  65 ++-------
 .../main/java/org/apache/oozie/DagEngine.java   |  83 ++---------
 .../org/apache/oozie/service/XLogService.java   |  12 +-
 .../oozie/service/XLogStreamingService.java     |  80 +++--------
 .../oozie/service/ZKXLogStreamingService.java   | 138 +++++--------------
 .../org/apache/oozie/servlet/V0JobServlet.java  |   3 +-
 .../org/apache/oozie/servlet/V1JobServlet.java  |   4 +-
 .../org/apache/oozie/util/AuthUrlClient.java    |   3 +-
 .../oozie/util/TimestampedMessageParser.java    |  36 +----
 .../org/apache/oozie/util/XLogAuditFilter.java  |  16 ---
 .../apache/oozie/util/XLogAuditStreamer.java    |  79 +++++++++++
 .../apache/oozie/util/XLogErrorStreamer.java    |  67 +++++++++
 .../java/org/apache/oozie/util/XLogFilter.java  |  76 ++++++----
 .../org/apache/oozie/util/XLogStreamer.java     | 117 ++++++++++++++--
 core/src/main/resources/oozie-default.xml       |  26 +++-
 .../oozie/TestCoordinatorEngineStreamLog.java   |   9 +-
 .../oozie/service/TestConfigurationService.java |   3 +-
 .../oozie/service/TestXLogStreamingService.java |  57 ++++++--
 .../service/TestZKXLogStreamingService.java     |  46 ++++++-
 .../org/apache/oozie/util/TestLogStreamer.java  |  47 ++++++-
 .../TestSimplifiedTimestampedMessageParser.java |  15 +-
 .../util/TestTimestampedMessageParser.java      |   9 +-
 .../oozie/util/TestXLogUserFilterParam.java     |  34 +----
 release-log.txt                                 |   1 +
 webapp/src/main/webapp/oozie-console.js         |   4 +-
 27 files changed, 639 insertions(+), 526 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/BaseEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/BaseEngine.java b/core/src/main/java/org/apache/oozie/BaseEngine.java
index 50df897..2780ec2 100644
--- a/core/src/main/java/org/apache/oozie/BaseEngine.java
+++ b/core/src/main/java/org/apache/oozie/BaseEngine.java
@@ -20,25 +20,22 @@ package org.apache.oozie;
 
 import java.io.IOException;
 import java.io.Writer;
-import java.util.Date;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.command.CommandException;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.JMSTopicService;
 import org.apache.oozie.service.Services;
-import org.apache.oozie.service.XLogStreamingService;
-import org.apache.oozie.util.XLogFilter;
+import org.apache.oozie.util.XLogAuditStreamer;
+import org.apache.oozie.util.XLogErrorStreamer;
+import org.apache.oozie.util.XLogStreamer;
 
 public abstract class BaseEngine {
     public static final String USE_XCOMMAND = "oozie.useXCommand";
 
-    public enum LOG_TYPE {
-        LOG, ERROR_LOG, AUDIT_LOG
-    }
-
     protected String user;
 
     /**
@@ -169,39 +166,60 @@ public abstract class BaseEngine {
      *
      * @param jobId job Id.
      * @param writer writer to stream the log to.
-     * @param params additional parameters from the request
+     * @param requestParameters additional parameters from the request
      * @throws IOException thrown if the log cannot be streamed.
      * @throws BaseEngineException thrown if there is error in getting the Workflow/Coordinator Job Information for
      *         jobId.
      */
-    public abstract void streamLog(String jobId, Writer writer, Map<String, String[]> params)
-            throws IOException, BaseEngineException;
+    public void streamLog(String jobId, Writer writer, Map<String, String[]> requestParameters) throws IOException,
+            BaseEngineException {
+        try {
+
+            streamJobLog(new XLogStreamer(requestParameters), jobId, writer);
+        }
+        catch (CommandException e) {
+            throw new IOException(e);
+        }
+    }
 
     /**
      * Stream error log of a job.
      *
      * @param jobId job Id.
      * @param writer writer to stream the log to.
-     * @param params additional parameters from the request
+     * @param requestParameters additional parameters from the request
      * @throws IOException thrown if the log cannot be streamed.
      * @throws BaseEngineException thrown if there is error in getting the Workflow/Coordinator Job Information for
      *         jobId.
      */
-    public abstract void streamErrorLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
-            BaseEngineException;
-    /**
+    public void streamErrorLog(String jobId, Writer writer, Map<String, String[]> requestParameters) throws IOException,
+            BaseEngineException {
+        try {
+
+            streamJobLog(new XLogErrorStreamer(requestParameters), jobId, writer);
+        }
+        catch (CommandException e) {
+            throw new IOException(e);
+        }
+    }    /**
      * Stream Audit log of a job.
      *
      * @param jobId job Id.
      * @param writer writer to stream the log to.
-     * @param params additional parameters from the request
+     * @param requestParameters additional parameters from the request
      * @throws IOException thrown if the log cannot be streamed.
      * @throws BaseEngineException thrown if there is error in getting the Workflow/Coordinator Job Information for
      *         jobId.
      */
-    public abstract void streamAuditLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
-            BaseEngineException;
-
+    public void streamAuditLog(String jobId, Writer writer, Map<String, String[]> requestParameters) throws IOException,
+            BaseEngineException {
+        try {
+            streamJobLog(new XLogAuditStreamer(requestParameters), jobId, writer);
+        }
+        catch (CommandException e) {
+            throw new IOException(e);
+        }
+    }
 
     /**
      * Return the workflow Job ID for an external ID.
@@ -299,25 +317,16 @@ public abstract class BaseEngine {
     public abstract void changeSLA(String id, String actions, String  dates, String childIds, String newParams)
             throws BaseEngineException;
 
-    protected void fetchLog(XLogFilter filter, Date startTime, Date endTime, Writer writer,
-            Map<String, String[]> params, LOG_TYPE logType) throws IOException {
-
-        switch (logType) {
-            case LOG:
-                Services.get().get(XLogStreamingService.class).streamLog(filter, startTime, endTime, writer, params);
-                break;
-            case ERROR_LOG:
-                Services.get().get(XLogStreamingService.class)
-                        .streamErrorLog(filter, startTime, endTime, writer, params);
-                break;
-            case AUDIT_LOG:
-                Services.get().get(XLogStreamingService.class)
-                        .streamAuditLog(filter, startTime, endTime, writer, params);
-                break;
-            default:
-                throw new IOException("Unsupported log Type");
-        }
-    }
-
+    /**
+     * Stream job log.
+     *
+     * @param xLogStreamer the log streamer
+     * @param jobId the job id
+     * @param writer the writer
+     * @throws IOException Signals that an I/O exception has occurred.
+     * @throws BaseEngineException the base engine exception
+     */
+    protected abstract void streamJobLog(XLogStreamer logStreamer, String jobId, Writer writer) throws IOException,
+            BaseEngineException;
 
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/BundleEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/BundleEngine.java b/core/src/main/java/org/apache/oozie/BundleEngine.java
index d0099b4..e994bff 100644
--- a/core/src/main/java/org/apache/oozie/BundleEngine.java
+++ b/core/src/main/java/org/apache/oozie/BundleEngine.java
@@ -54,14 +54,14 @@ import org.apache.oozie.command.bundle.BundleSubmitXCommand;
 import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.DagXLogInfoService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.XLogStreamingService;
 import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.JobUtils;
 import org.apache.oozie.util.JobsFilterUtils;
 import org.apache.oozie.util.ParamChecker;
 import org.apache.oozie.util.XLog;
-import org.apache.oozie.util.XLogAuditFilter;
-import org.apache.oozie.util.XLogFilter;
-import org.apache.oozie.util.XLogUserFilterParam;
+import org.apache.oozie.util.XLogStreamer;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -245,44 +245,11 @@ public class BundleEngine extends BaseEngine {
         }
     }
 
-    @Override
-    public void streamLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
-            BundleEngineException {
-        streamJobLog(jobId, writer, params, LOG_TYPE.LOG);
-    }
-
-    @Override
-    public void streamErrorLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
+    protected void streamJobLog(XLogStreamer logStreamer, String jobId, Writer writer) throws IOException,
             BundleEngineException {
-        streamJobLog(jobId, writer, params, LOG_TYPE.ERROR_LOG);
-    }
-
-    @Override
-    public void streamAuditLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
-            BundleEngineException {
-        try {
-            streamJobLog(new XLogAuditFilter(new XLogUserFilterParam(params)), jobId, writer, params, LOG_TYPE.AUDIT_LOG);
-        }
-        catch (CommandException e) {
-            throw new IOException(e);
-        }
-    }
-
-    private void streamJobLog(String jobId, Writer writer, Map<String, String[]> params, LOG_TYPE logType)
-            throws IOException, BundleEngineException {
-        try {
-            streamJobLog(new XLogFilter(new XLogUserFilterParam(params)), jobId, writer, params, logType);
-        }
-        catch (Exception e) {
-            throw new IOException(e);
-        }
-    }
-
-    private void streamJobLog(XLogFilter filter, String jobId, Writer writer, Map<String, String[]> params, LOG_TYPE logType)
-            throws IOException, BundleEngineException {
-        try {
+    try {
             BundleJobBean job;
-            filter.setParameter(DagXLogInfoService.JOB, jobId);
+            logStreamer.getXLogFilter().setParameter(DagXLogInfoService.JOB, jobId);
             job = new BundleJobXCommand(jobId).call();
             Date lastTime = null;
             if (job.isTerminalStatus()) {
@@ -291,9 +258,10 @@ public class BundleEngine extends BaseEngine {
             if (lastTime == null) {
                 lastTime = new Date();
             }
-            fetchLog(filter, job.getCreatedTime(), lastTime, writer, params, logType);
+            Services.get().get(XLogStreamingService.class)
+                    .streamLog(logStreamer, job.getCreatedTime(), lastTime, writer);
         }
-        catch (Exception ex) {
+        catch (CommandException ex) {
             throw new IOException(ex);
         }
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
index 2f9f822..2c04bea 100644
--- a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
+++ b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
@@ -76,8 +76,8 @@ import org.apache.oozie.util.JobUtils;
 import org.apache.oozie.util.Pair;
 import org.apache.oozie.util.ParamChecker;
 import org.apache.oozie.util.XLog;
-import org.apache.oozie.util.XLogAuditFilter;
 import org.apache.oozie.util.XLogFilter;
+import org.apache.oozie.util.XLogStreamer;
 import org.apache.oozie.util.XLogUserFilterParam;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -303,56 +303,20 @@ public class CoordinatorEngine extends BaseEngine {
         throw new BaseEngineException(new XException(ErrorCode.E0301, "invalid use of start"));
     }
 
-    @Override
-    public void streamLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
-            BaseEngineException {
-        streamJobLog(jobId, writer, params, LOG_TYPE.LOG);
-    }
-
-    @Override
-    public void streamErrorLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
-            BaseEngineException {
-        streamJobLog(jobId, writer, params, LOG_TYPE.ERROR_LOG);
-    }
 
     @Override
-    public void streamAuditLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
-    BaseEngineException {
-        try {
-            streamJobLog(new XLogAuditFilter(new XLogUserFilterParam(params)), jobId, writer, params, LOG_TYPE.AUDIT_LOG);
-        }
-        catch (CommandException e) {
-            throw new IOException(e);
-        }
-    }
-
-    private void streamJobLog(String jobId, Writer writer, Map<String, String[]> params, LOG_TYPE logType)
+    protected void streamJobLog(XLogStreamer logStreamer, String jobId, Writer writer)
             throws IOException, BaseEngineException {
-        try {
-            streamJobLog(new XLogFilter(new XLogUserFilterParam(params)), jobId, writer, params, logType);
-        }
-        catch (Exception e) {
-            throw new IOException(e);
-        }
-    }
-
-    private void streamJobLog(XLogFilter filter, String jobId, Writer writer, Map<String, String[]> params, LOG_TYPE logType)
-            throws IOException, BaseEngineException {
-        try {
-            filter.setParameter(DagXLogInfoService.JOB, jobId);
-            Date lastTime = null;
-            CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
-            if (job.isTerminalStatus()) {
-                lastTime = job.getLastModifiedTime();
-            }
-            if (lastTime == null) {
-                lastTime = new Date();
-            }
-            fetchLog(filter, job.getCreatedTime(), lastTime, writer, params, logType);
+        logStreamer.getXLogFilter().setParameter(DagXLogInfoService.JOB, jobId);
+        Date lastTime = null;
+        CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
+        if (job.isTerminalStatus()) {
+            lastTime = job.getLastModifiedTime();
         }
-        catch (Exception e) {
-            throw new IOException(e);
+        if (lastTime == null) {
+            lastTime = new Date();
         }
+        Services.get().get(XLogStreamingService.class).streamLog(logStreamer, job.getCreatedTime(), lastTime, writer);
     }
 
     /**
@@ -362,17 +326,17 @@ public class CoordinatorEngine extends BaseEngine {
      * @param logRetrievalScope Value for the retrieval type
      * @param logRetrievalType Based on which filter criteria the log is retrieved
      * @param writer writer to stream the log to
-     * @param params additional parameters from the request
+     * @param requestParameters additional parameters from the request
      * @throws IOException
      * @throws BaseEngineException
      * @throws CommandException
      */
     public void streamLog(String jobId, String logRetrievalScope, String logRetrievalType, Writer writer,
-            Map<String, String[]> params) throws IOException, BaseEngineException, CommandException {
+            Map<String, String[]> requestParameters) throws IOException, BaseEngineException, CommandException {
 
         Date startTime = null;
         Date endTime = null;
-        XLogFilter filter = new XLogFilter(new XLogUserFilterParam(params));
+        XLogFilter filter = new XLogFilter(new XLogUserFilterParam(requestParameters));
 
         filter.setParameter(DagXLogInfoService.JOB, jobId);
         if (logRetrievalScope != null && logRetrievalType != null) {
@@ -528,7 +492,8 @@ public class CoordinatorEngine extends BaseEngine {
                 }
             }
         }
-        Services.get().get(XLogStreamingService.class).streamLog(filter, startTime, endTime, writer, params);
+        Services.get().get(XLogStreamingService.class).streamLog(new XLogStreamer(filter, requestParameters), startTime,
+                endTime, writer);
     }
 
     /*

http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/DagEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/DagEngine.java b/core/src/main/java/org/apache/oozie/DagEngine.java
index 57d2761..36198b5 100644
--- a/core/src/main/java/org/apache/oozie/DagEngine.java
+++ b/core/src/main/java/org/apache/oozie/DagEngine.java
@@ -63,13 +63,12 @@ import org.apache.oozie.service.CallableQueueService;
 import org.apache.oozie.service.DagXLogInfoService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.XLogService;
+import org.apache.oozie.service.XLogStreamingService;
 import org.apache.oozie.util.ParamChecker;
 import org.apache.oozie.util.XCallable;
 import org.apache.oozie.util.XConfiguration;
 import org.apache.oozie.util.XLog;
-import org.apache.oozie.util.XLogAuditFilter;
-import org.apache.oozie.util.XLogFilter;
-import org.apache.oozie.util.XLogUserFilterParam;
+import org.apache.oozie.util.XLogStreamer;
 
 /**
  * The DagEngine provides all the DAG engine functionality for WS calls.
@@ -389,80 +388,16 @@ public class DagEngine extends BaseEngine {
         }
     }
 
-    /**
-     * Stream the log of a job.
-     *
-     * @param jobId job Id.
-     * @param writer writer to stream the log to.
-     * @param params additional parameters from the request
-     * @throws IOException thrown if the log cannot be streamed.
-     * @throws DagEngineException thrown if there is error in getting the Workflow Information for jobId.
-     */
     @Override
-    public void streamLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
-            DagEngineException {
-        streamJobLog(jobId, writer, params, LOG_TYPE.LOG);
-    }
-
-    /**
-     * Stream the error log of a job.
-     *
-     * @param jobId job Id.
-     * @param writer writer to stream the log to.
-     * @param params additional parameters from the request
-     * @throws IOException thrown if the log cannot be streamed.
-     * @throws DagEngineException thrown if there is error in getting the Workflow Information for jobId.
-     */
-    @Override
-    public void streamErrorLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
-            DagEngineException {
-        streamJobLog(jobId, writer, params, LOG_TYPE.ERROR_LOG);
-    }
-
-    /**
-     * Stream the audit log of a job.
-     *
-     * @param jobId job Id.
-     * @param writer writer to stream the log to.
-     * @param params additional parameters from the request
-     * @throws IOException thrown if the log cannot be streamed.
-     * @throws DagEngineException thrown if there is error in getting the Workflow Information for jobId.
-     */
-    @Override
-    public void streamAuditLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
-            DagEngineException {
-        try {
-            streamJobLog(new XLogAuditFilter(new XLogUserFilterParam(params)),jobId, writer, params, LOG_TYPE.AUDIT_LOG);
-        }
-        catch (CommandException e) {
-            throw new IOException(e);
-        }
-    }
-
-    private void streamJobLog(String jobId, Writer writer, Map<String, String[]> params, LOG_TYPE logType)
-            throws IOException, DagEngineException {
-        try {
-            streamJobLog(new XLogFilter(new XLogUserFilterParam(params)), jobId, writer, params, logType);
-        }
-        catch (Exception e) {
-            throw new IOException(e);
-        }
-    }
-
-    private void streamJobLog(XLogFilter filter, String jobId, Writer writer, Map<String, String[]> params, LOG_TYPE logType)
+    protected void streamJobLog(XLogStreamer logStreamer, String jobId, Writer writer)
             throws IOException, DagEngineException {
-        try {
-            filter.setParameter(DagXLogInfoService.JOB, jobId);
-            WorkflowJob job = getJob(jobId);
-            Date lastTime = job.getEndTime();
-            if (lastTime == null) {
-                lastTime = job.getLastModifiedTime();
-            }
-            fetchLog(filter, job.getCreatedTime(), lastTime, writer, params, logType);
-        }
-        catch (Exception e) {
-            throw new IOException(e);
+        logStreamer.getXLogFilter().setParameter(DagXLogInfoService.JOB, jobId);
+        WorkflowJob job = getJob(jobId);
+        Date lastTime = job.getEndTime();
+        if (lastTime == null) {
+            lastTime = job.getLastModifiedTime();
         }
+        Services.get().get(XLogStreamingService.class).streamLog(logStreamer, job.getCreatedTime(), lastTime, writer);
     }
 
     private static final Set<String> FILTER_NAMES = new HashSet<String>();

http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/service/XLogService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/XLogService.java b/core/src/main/java/org/apache/oozie/service/XLogService.java
index 04f04f4..0b43722 100644
--- a/core/src/main/java/org/apache/oozie/service/XLogService.java
+++ b/core/src/main/java/org/apache/oozie/service/XLogService.java
@@ -295,23 +295,23 @@ public class XLogService implements Service, Instrumentable {
         });
     }
 
-    boolean getLogOverWS() {
+    public boolean getLogOverWS() {
         return logOverWS;
     }
 
-    boolean isErrorLogEnabled(){
+    public boolean isErrorLogEnabled(){
         return errorLogEnabled;
     }
 
-    int getOozieLogRotation() {
+    public int getOozieLogRotation() {
         return oozieLogRotation;
     }
 
-    int getOozieErrorLogRotation() {
+    public int getOozieErrorLogRotation() {
         return oozieErrorLogRotation;
     }
 
-    int getOozieAuditLogRotation() {
+    public int getOozieAuditLogRotation() {
         return oozieAuditLogRotation;
     }
 
@@ -323,7 +323,7 @@ public class XLogService implements Service, Instrumentable {
         return oozieAuditLogName;
     }
 
-    boolean isAuditLogEnabled() {
+    public boolean isAuditLogEnabled() {
         return auditLogEnabled;
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/service/XLogStreamingService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/XLogStreamingService.java b/core/src/main/java/org/apache/oozie/service/XLogStreamingService.java
index c15c4c1..40ba46e 100644
--- a/core/src/main/java/org/apache/oozie/service/XLogStreamingService.java
+++ b/core/src/main/java/org/apache/oozie/service/XLogStreamingService.java
@@ -18,24 +18,19 @@
 
 package org.apache.oozie.service;
 
-import org.apache.oozie.util.XLogFilter;
+import org.apache.commons.lang.StringUtils;
 import org.apache.oozie.util.Instrumentable;
 import org.apache.oozie.util.Instrumentation;
 import org.apache.oozie.util.XLogStreamer;
-
 import java.io.IOException;
 import java.io.Writer;
-import java.util.Map;
 import java.util.Date;
 
 /**
  * Service that performs streaming of log files over Web Services if enabled in XLogService
  */
 public class XLogStreamingService implements Service, Instrumentable {
-    private static final String CONF_PREFIX = Service.CONF_PREFIX + "XLogStreamingService.";
-    public static final String STREAM_BUFFER_LEN = CONF_PREFIX + "buffer.len";
 
-    protected int bufferLen;
 
     /**
      * Initialize the log streaming service.
@@ -44,7 +39,6 @@ public class XLogStreamingService implements Service, Instrumentable {
      * @throws ServiceException thrown if the log streaming service could not be initialized.
      */
     public void init(Services services) throws ServiceException {
-        bufferLen = ConfigurationService.getInt(services.getConf(), STREAM_BUFFER_LEN);
     }
 
     /**
@@ -71,75 +65,37 @@ public class XLogStreamingService implements Service, Instrumentable {
         // nothing to instrument
     }
 
-    /**
-     * Stream the log of a job.
-     *
-     * @param filter log streamer filter.
-     * @param startTime start time for log events to filter.
-     * @param endTime end time for log events to filter.
-     * @param writer writer to stream the log to.
-     * @param params additional parameters from the request
-     * @throws IOException thrown if the log cannot be streamed.
-     */
-    public void streamLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params)
-            throws IOException {
-        XLogService xLogService = Services.get().get(XLogService.class);
-        if (xLogService.getLogOverWS()) {
-            new XLogStreamer(filter, xLogService.getOozieLogPath(), xLogService.getOozieLogName(),
-                    xLogService.getOozieLogRotation()).streamLog(writer, startTime, endTime, bufferLen);
-        }
-        else {
-            writer.write("Log streaming disabled!!");
-        }
-    }
 
     /**
-     * Stream the error log of a job.
+     * Stream the log of a job.
      *
+     * @param logStreamer the log streamer
      * @param filter log streamer filter.
      * @param startTime start time for log events to filter.
      * @param endTime end time for log events to filter.
      * @param writer writer to stream the log to.
-     * @param params additional parameters from the request
-     * @throws IOException thrown if the log cannot be streamed.
+     * @throws IOException Signals that an I/O exception has occurred.
      */
-    public void streamErrorLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params)
-            throws IOException {
-        XLogService xLogService = Services.get().get(XLogService.class);
-        if (xLogService.isErrorLogEnabled()) {
-            new XLogStreamer(filter, xLogService.getOozieErrorLogPath(), xLogService.getOozieErrorLogName(),
-                    xLogService.getOozieErrorLogRotation()).streamLog(writer, startTime, endTime, bufferLen);
-        }
-        else {
-            writer.write("Error Log is disabled!!");
-        }
+    public void streamLog(XLogStreamer logStreamer, Date startTime, Date endTime, Writer writer) throws IOException {
+        streamLog(logStreamer, startTime, endTime, writer, true);
     }
 
     /**
-     * Stream the audit log of a job.
+     * Stream log.
      *
-     * @param filter log streamer filter.
-     * @param startTime start time for log events to filter.
-     * @param endTime end time for log events to filter.
-     * @param writer writer to stream the log to.
-     * @param params additional parameters from the request
-     * @throws IOException thrown if the log cannot be streamed.
+     * @param logStreamer the log streamer
+     * @param startTime the start time
+     * @param endTime the end time
+     * @param writer the writer
+     * @param appendDebug the append debug
+     * @throws IOException Signals that an I/O exception has occurred.
      */
-    public void streamAuditLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params)
+    protected void streamLog(XLogStreamer logStreamer, Date startTime, Date endTime, Writer writer, boolean appendDebug)
             throws IOException {
-        XLogService xLogService = Services.get().get(XLogService.class);
-        if (xLogService.isAuditLogEnabled()) {
-            new XLogStreamer(filter, xLogService.getOozieAuditLogPath(), xLogService.getOozieAuditLogName(),
-                    xLogService.getOozieAuditLogRotation()).streamLog(writer, startTime, endTime, bufferLen);
-        }
-        else {
-            writer.write("Audit Log is disabled!!");
+        if (!logStreamer.isLogEnabled()) {
+            writer.write(logStreamer.getLogDisableMessage());
+            return;
         }
-    }
-
-
-
-    public int getBufferLen() {
-        return bufferLen;
+        logStreamer.streamLog(writer, startTime, endTime, appendDebug);
     }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java b/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
index 97771ad..3a5081c 100644
--- a/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
+++ b/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.curator.x.discovery.ServiceInstance;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.client.OozieClient;
@@ -37,7 +38,6 @@ import org.apache.oozie.util.Instrumentation;
 import org.apache.oozie.util.SimpleTimestampedMessageParser;
 import org.apache.oozie.util.TimestampedMessageParser;
 import org.apache.oozie.util.XLog;
-import org.apache.oozie.util.XLogFilter;
 import org.apache.oozie.util.XLogStreamer;
 import org.apache.oozie.util.ZKUtils;
 
@@ -93,119 +93,42 @@ public class ZKXLogStreamingService extends XLogStreamingService implements Serv
     /**
      * Stream the log of a job.  It contacts any other running Oozie servers to collate relevant logs while streaming.
      *
-     * @param filter log streamer filter.
+     * @param logStreamer the log streamer
      * @param startTime start time for log events to filter.
      * @param endTime end time for log events to filter.
      * @param writer writer to stream the log to.
-     * @param params additional parameters from the request
      * @throws IOException thrown if the log cannot be streamed.
      */
     @Override
-    public void streamLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params)
-            throws IOException {
-        XLogService xLogService = Services.get().get(XLogService.class);
-        if (xLogService.getLogOverWS()) {
-            // If ALL_SERVERS_PARAM is set to false, then only stream our log
-            if (!Services.get().get(JobsConcurrencyService.class).isAllServerRequest(params)) {
-                new XLogStreamer(filter, xLogService.getOozieLogPath(), xLogService.getOozieLogName(),
-                        xLogService.getOozieLogRotation()).streamLog(writer, startTime, endTime, bufferLen);
-            }
-            // Otherwise, we have to go collate relevant logs from the other Oozie servers
-            else {
-                collateLogs(filter, startTime, endTime, writer, params, xLogService.getOozieLogPath(),
-                        xLogService.getOozieLogName(), xLogService.getOozieLogRotation(), RestConstants.JOB_SHOW_LOG);
-            }
-        }
-        else {
-            writer.write("Log streaming disabled!!");
-        }
-    }
+    public void streamLog(XLogStreamer logStreamer, Date startTime, Date endTime, Writer writer) throws IOException {
 
-    /**
-     * Stream the error log of a job.  It contacts any other running Oozie servers to collate relevant error logs while streaming.
-     *
-     * @param filter log streamer filter.
-     * @param startTime start time for log events to filter.
-     * @param endTime end time for log events to filter.
-     * @param writer writer to stream the log to.
-     * @param params additional parameters from the request
-     * @throws IOException thrown if the log cannot be streamed.
-     */
-    public void streamErrorLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params)
-            throws IOException {
-        XLogService xLogService = Services.get().get(XLogService.class);
-        if (xLogService.isErrorLogEnabled()) {
-            // If ALL_SERVERS_PARAM is set to false, then only stream our log
-            if (!Services.get().get(JobsConcurrencyService.class).isAllServerRequest(params)) {
-                new XLogStreamer(filter, xLogService.getOozieErrorLogPath(), xLogService.getOozieErrorLogName(),
-                        xLogService.getOozieErrorLogRotation()).streamLog(writer, startTime, endTime, bufferLen);
-            }
-            // Otherwise, we have to go collate relevant logs from the other Oozie servers
-            else {
-                collateLogs(filter, startTime, endTime, writer, params, xLogService.getOozieLogPath(),
-                        xLogService.getOozieErrorLogName(), xLogService.getOozieErrorLogRotation(),
-                        RestConstants.JOB_SHOW_ERROR_LOG);
-            }
-        }
-        else {
-            writer.write("Error Log streaming disabled!!");
+        if (!logStreamer.isLogEnabled()) {
+            writer.write(logStreamer.getLogDisableMessage());
+            return;
         }
-    }
-
-    /**
-     * Stream the audit log of a job.  It contacts any other running Oozie servers to collate relevant audit logs while streaming.
-     *
-     * @param filter log streamer filter.
-     * @param startTime start time for log events to filter.
-     * @param endTime end time for log events to filter.
-     * @param writer writer to stream the log to.
-     * @param params additional parameters from the request
-     * @throws IOException thrown if the log cannot be streamed.
-     */
-    @Override
-    public void streamAuditLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params)
-            throws IOException {
-        XLogService xLogService = Services.get().get(XLogService.class);
-        if (xLogService.isAuditLogEnabled()) {
-            // If ALL_SERVERS_PARAM is set to false, then only stream our log
-            if (!Services.get().get(JobsConcurrencyService.class).isAllServerRequest(params)) {
-                new XLogStreamer(filter, xLogService.getOozieAuditLogPath(), xLogService.getOozieAuditLogName(),
-                        xLogService.getOozieAuditLogRotation()).streamLog(writer, startTime, endTime, bufferLen);
-            }
-            // Otherwise, we have to go collate relevant logs from the other Oozie servers
-            else {
-                collateLogs(filter, startTime, endTime, writer, params, xLogService.getOozieAuditLogPath(),
-                        xLogService.getOozieAuditLogName(), xLogService.getOozieAuditLogRotation(),
-                        RestConstants.JOB_SHOW_AUDIT_LOG);
-            }
+        // If ALL_SERVERS_PARAM is set to false, then only stream our log
+        if (!Services.get().get(JobsConcurrencyService.class).isAllServerRequest(logStreamer.getRequestParam())) {
+            super.streamLog(logStreamer, startTime, endTime, writer, false);
         }
+        // Otherwise, we have to go collate relevant logs from the other Oozie servers
         else {
-            writer.write("Audit Log streaming disabled!!");
+            collateLogs(logStreamer, startTime, endTime, writer);
         }
     }
 
-
-
     /**
      * Contacts each of the other Oozie servers, gets their logs for the job, collates them, and sends them to the user via the
      * Writer.  It will make sure to not read all of the log messages into memory at the same time to not use up the heap.  If there
      * is a problem talking to one of the other servers, it will ignore that server and prepend a message to the Writer about it.
      * For getting the logs from this server, it won't use the REST API and instead get them directly to be more efficient.
      *
-     * @param filter the job filter
+     * @param logStreamer the XLogStreamer
      * @param startTime the job start time
      * @param endTime the job end time
      * @param writer the writer
-     * @param params the params
-     * @param logPath the log path
-     * @param logName the log name
-     * @param rotation the rotation
-     * @param logType the log type
      * @throws IOException Signals that an I/O exception has occurred.
      */
-    private void collateLogs(XLogFilter filter, Date startTime, Date endTime, Writer writer,
-            Map<String, String[]> params, String logPath, String logName, int rotation, final String logType) throws IOException {
-        XLogService xLogService = Services.get().get(XLogService.class);
+    private void collateLogs(XLogStreamer logStreamer, Date startTime, Date endTime, Writer writer) throws IOException {
         List<String> badOozies = new ArrayList<String>();
         List<ServiceInstance<Map>> oozies = null;
         try {
@@ -222,24 +145,28 @@ public class ZKXLogStreamingService extends XLogStreamingService implements Serv
                 String otherId = oozieMeta.get(ZKUtils.ZKMetadataKeys.OOZIE_ID);
                 // If it's this server, we can just get them directly
                 if (otherId.equals(zk.getZKId())) {
-                    BufferedReader reader = new XLogStreamer(filter, logPath, logName, rotation).makeReader(startTime,
+                    BufferedReader reader = logStreamer.makeReader(startTime,
                             endTime);
-                    parsers.add(new TimestampedMessageParser(reader, filter));
+                    parsers.add(new TimestampedMessageParser(reader, logStreamer.getXLogFilter()));
                 }
                 // If it's another server, we'll have to use the REST API
                 else {
                     String otherUrl = oozieMeta.get(ZKUtils.ZKMetadataKeys.OOZIE_URL);
-                    String jobId = filter.getFilterParams().get(DagXLogInfoService.JOB);
+                    String jobId = logStreamer.getXLogFilter().getFilterParams().get(DagXLogInfoService.JOB);
                     try {
                      // It's important that we specify ALL_SERVERS_PARAM=false in the GET request to prevent the other Oozie
                      // Server from trying aggregate logs from the other Oozie servers (and creating an infinite recursion)
                         final String url = otherUrl + "/v" + OozieClient.WS_PROTOCOL_VERSION + "/" + RestConstants.JOB
-                                + "/" + jobId + "?" + RestConstants.JOB_SHOW_PARAM + "=" + logType
-                                + "&" + RestConstants.ALL_SERVER_REQUEST + "=false" + AuthUrlClient.getQueryParamString(params);
+                                + "/" + jobId + "?" + RestConstants.JOB_SHOW_PARAM + "=" + logStreamer.getLogType()
+                                + "&" + RestConstants.ALL_SERVER_REQUEST + "=false"
+                                + AuthUrlClient.getQueryParamString(logStreamer.getRequestParam());
                         // remove doAs from url to avoid failure while fetching
                         // logs in case of HA mode
                         String key = "doAs";
-                        String[] value = params.get(key);
+                        String[] value = null;
+                        if (logStreamer.getRequestParam() != null) {
+                            value = logStreamer.getRequestParam().get(key);
+                        }
                         String urlWithoutdoAs = null;
                         if (value != null && value.length > 0 && value[0] != null && value[0].length() > 0) {
                             urlWithoutdoAs = url.replace("&" + key + "=" + URLEncoder.encode(value[0], "UTF-8"), "");
@@ -248,7 +175,7 @@ public class ZKXLogStreamingService extends XLogStreamingService implements Serv
                             urlWithoutdoAs = url;
                         }
                         BufferedReader reader = AuthUrlClient.callServer(urlWithoutdoAs);
-                        parsers.add(new SimpleTimestampedMessageParser(reader, filter));
+                        parsers.add(new SimpleTimestampedMessageParser(reader, logStreamer.getXLogFilter()));
                     }
                     catch(IOException ioe) {
                         log.warn("Failed to retrieve logs for job [" + jobId + "] from Oozie server with ID [" + otherId
@@ -259,10 +186,13 @@ public class ZKXLogStreamingService extends XLogStreamingService implements Serv
             }
 
             //If log param debug is set, we need to write start date and end date to outputstream.
-            if(filter.isDebugMode()){
-                writer.write(filter.getDebugMessage());
+            if(!StringUtils.isEmpty(logStreamer.getXLogFilter().getTruncatedMessage())){
+                writer.write(logStreamer.getXLogFilter().getTruncatedMessage());
             }
 
+            if (logStreamer.getXLogFilter().isDebugMode()) {
+                writer.write(logStreamer.getXLogFilter().getDebugMessage());
+            }
             // Add a message about any servers we couldn't contact
             if (!badOozies.isEmpty()) {
                 writer.write("Unable to contact the following Oozie Servers for logs (log information may be incomplete):\n");
@@ -278,7 +208,7 @@ public class ZKXLogStreamingService extends XLogStreamingService implements Serv
             // If it's just the one server (this server), then we don't need to do any more processing and can just copy it directly
             if (parsers.size() == 1) {
                 TimestampedMessageParser parser = parsers.get(0);
-                parser.processRemaining(writer, bufferLen);
+                parser.processRemaining(writer, logStreamer);
             }
             else {
                 // Now that we have a Reader for each server to get the logs from that server, we have to collate them.  Within each
@@ -292,16 +222,13 @@ public class ZKXLogStreamingService extends XLogStreamingService implements Serv
                         timestampMap.put(parser.getLastTimestamp(), parser);
                     }
                 }
-                int bytesWritten = 0;
                 while (timestampMap.size() > 1) {
                     // The first entry will be the earliest based on the timestamp (also removes it) from the map
                     TimestampedMessageParser earliestParser = timestampMap.pollFirstEntry().getValue();
                     // Write the message from that parser at that timestamp
                     writer.write(earliestParser.getLastMessage());
-                    bytesWritten = earliestParser.getLastMessage().length();
-                    if (bytesWritten > bufferLen) {
+                    if (logStreamer.shouldFlushOutput(earliestParser.getLastMessage().length())) {
                         writer.flush();
-                        bytesWritten = 0;
                     }
                     // Increment that parser to read the next message
                     if (earliestParser.increment()) {
@@ -313,7 +240,7 @@ public class ZKXLogStreamingService extends XLogStreamingService implements Serv
                 if (timestampMap.size() == 1) {
                     TimestampedMessageParser parser = timestampMap.values().iterator().next();
                     writer.write(parser.getLastMessage());  // don't forget the last message read by the parser
-                    parser.processRemaining(writer, bufferLen, bytesWritten + parser.getLastMessage().length());
+                    parser.processRemaining(writer, logStreamer);
                 }
             }
         }
@@ -321,7 +248,6 @@ public class ZKXLogStreamingService extends XLogStreamingService implements Serv
             for (TimestampedMessageParser parser : parsers) {
                 parser.closeReader();
             }
-            writer.flush();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java b/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java
index d3b4689..39c7b85 100644
--- a/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java
+++ b/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java
@@ -24,6 +24,7 @@ import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.BaseEngineException;
 import org.apache.oozie.DagEngine;
 import org.apache.oozie.DagEngineException;
 import org.apache.oozie.client.rest.JsonBean;
@@ -192,7 +193,7 @@ public class V0JobServlet extends BaseJobServlet {
         try {
             dagEngine.streamLog(jobId, response.getWriter(), request.getParameterMap());
         }
-        catch (DagEngineException ex) {
+        catch (BaseEngineException ex) {
             throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
         }
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java b/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java
index 9356768..10812c6 100644
--- a/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java
+++ b/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java
@@ -990,7 +990,7 @@ public class V1JobServlet extends BaseJobServlet {
         try {
             dagEngine.streamLog(jobId, response.getWriter(), request.getParameterMap());
         }
-        catch (DagEngineException ex) {
+        catch (BaseEngineException ex) {
             throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
         }
     }
@@ -1009,7 +1009,7 @@ public class V1JobServlet extends BaseJobServlet {
         try {
             bundleEngine.streamLog(jobId, response.getWriter(), request.getParameterMap());
         }
-        catch (BundleEngineException ex) {
+        catch (BaseEngineException ex) {
             throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
         }
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/util/AuthUrlClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/AuthUrlClient.java b/core/src/main/java/org/apache/oozie/util/AuthUrlClient.java
index b45a96a..4fc8f57 100644
--- a/core/src/main/java/org/apache/oozie/util/AuthUrlClient.java
+++ b/core/src/main/java/org/apache/oozie/util/AuthUrlClient.java
@@ -37,11 +37,11 @@ import org.apache.hadoop.security.authentication.client.Authenticator;
 import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
 import org.apache.hadoop.security.authentication.client.PseudoAuthenticator;
 import org.apache.oozie.service.ConfigurationService;
-import org.apache.oozie.service.Services;
 
 public class AuthUrlClient {
 
     public static final String SERVER_SERVER_AUTH_TYPE = "oozie.server.authentication.type";
+    public static final String SERVER_SERVER_CONNECTION_TIMEOUT_SECONDS = "oozie.server.connection.timeout.seconds";
 
     private static XLog LOG = XLog.getLog(AuthUrlClient.class);
 
@@ -129,6 +129,7 @@ public class AuthUrlClient {
                 @Override
                 public BufferedReader run() throws IOException {
                     HttpURLConnection conn = getConnection(url);
+                    conn.setConnectTimeout(ConfigurationService.getInt(SERVER_SERVER_CONNECTION_TIMEOUT_SECONDS, 180));
                     BufferedReader reader = null;
                     if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
                         InputStream is = conn.getInputStream();

http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/util/TimestampedMessageParser.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/TimestampedMessageParser.java b/core/src/main/java/org/apache/oozie/util/TimestampedMessageParser.java
index 830f287..7fa8a60 100644
--- a/core/src/main/java/org/apache/oozie/util/TimestampedMessageParser.java
+++ b/core/src/main/java/org/apache/oozie/util/TimestampedMessageParser.java
@@ -194,50 +194,24 @@ public class TimestampedMessageParser {
     }
 
     /**
-     * Streams log messages to the passed in Writer. Flushes the log writing
-     * based on buffer len
+     * Streams log messages to the passed in Writer, with zero bytes already
+     * written
      *
      * @param writer
-     * @param bufferLen maximum len of log buffer
-     * @param bytesWritten num bytes already written to writer
+     * @param logStreamer the log streamer
      * @throws IOException
      */
-    public void processRemaining(Writer writer, int bufferLen, int bytesWritten) throws IOException {
+    public void processRemaining(Writer writer, XLogStreamer logStreamer) throws IOException {
         while (increment()) {
             writer.write(lastMessage);
-            bytesWritten += lastMessage.length();
-            if (bytesWritten > bufferLen) {
+            if (logStreamer.shouldFlushOutput(lastMessage.length())) {
                 writer.flush();
-                bytesWritten = 0;
             }
         }
         writer.flush();
     }
 
     /**
-     * Streams log messages to the passed in Writer, with zero bytes already
-     * written
-     *
-     * @param writer
-     * @param bufferLen maximum len of log buffer
-     * @throws IOException
-     */
-    public void processRemaining(Writer writer, int bufferLen) throws IOException {
-        processRemaining(writer, bufferLen, 0);
-    }
-
-    /**
-     * Streams log messages to the passed in Writer, with default buffer len 4K
-     * and zero bytes already written
-     *
-     * @param writer
-     * @throws IOException
-     */
-    public void processRemaining(Writer writer) throws IOException {
-        processRemaining(writer, Services.get().get(XLogStreamingService.class).getBufferLen());
-    }
-
-    /**
      * Splits the log message into parts
      *
      * @param line

http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/util/XLogAuditFilter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/XLogAuditFilter.java b/core/src/main/java/org/apache/oozie/util/XLogAuditFilter.java
index c377db5..465d808 100644
--- a/core/src/main/java/org/apache/oozie/util/XLogAuditFilter.java
+++ b/core/src/main/java/org/apache/oozie/util/XLogAuditFilter.java
@@ -17,8 +17,6 @@
  */
 package org.apache.oozie.util;
 
-import java.io.IOException;
-import java.util.Date;
 import java.util.regex.Pattern;
 
 import org.apache.oozie.service.DagXLogInfoService;
@@ -29,20 +27,6 @@ public class XLogAuditFilter extends XLogFilter {
         super(xLogUserFilterParam);
     }
 
-    @Override
-    public void calculateAndValidateDateRange(Date jobStartTime, Date jobEndTime) throws IOException {
-        // for testcase, otherwise jobStartTime and jobEndTime will be always set
-        if (jobStartTime == null || jobEndTime == null) {
-            return;
-        }
-
-        if (jobStartTime.after(jobEndTime)) {
-            throw new IOException("Start time should be less than end time. startTime = " + jobStartTime
-                    + " endtime = " + jobEndTime);
-        }
-
-    }
-
     public void constructPattern() {
         // audit log will only need to scan on jobID
         StringBuilder sb = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/util/XLogAuditStreamer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/XLogAuditStreamer.java b/core/src/main/java/org/apache/oozie/util/XLogAuditStreamer.java
new file mode 100644
index 0000000..6b27ae3
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/util/XLogAuditStreamer.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.oozie.client.rest.RestConstants;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.service.ConfigurationService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.XLogService;
+
+public class XLogAuditStreamer extends XLogStreamer {
+    public static final String STREAM_BUFFER_LEN = CONF_PREFIX + "audit.buffer.len";
+
+    public XLogAuditStreamer(XLogFilter logFilter, Map<String, String[]> requestParameters) {
+        super(logFilter, Services.get().get(XLogService.class).getOozieAuditLogPath(),
+                Services.get().get(XLogService.class).getOozieAuditLogName(),
+                Services.get().get(XLogService.class).getOozieAuditLogRotation());
+        this.requestParam = requestParameters;
+        bufferLen = ConfigurationService.getInt(STREAM_BUFFER_LEN, 3);
+    }
+
+    public XLogAuditStreamer(Map<String, String[]> requestParameters) throws CommandException {
+        this(new XLogAuditFilter(new XLogUserFilterParam(requestParameters)), requestParameters);
+
+    }
+
+    @Override
+    protected void calculateAndValidateDateRange(Date startTime, Date endTime) throws IOException {
+        logFilter.calculateAndCheckDates(startTime, endTime);
+        // no validate
+    }
+
+    @Override
+    public boolean isLogEnabled() {
+        return Services.get().get(XLogService.class).isAuditLogEnabled();
+    }
+
+    @Override
+    public String getLogType() {
+        return RestConstants.JOB_SHOW_AUDIT_LOG;
+    }
+
+    @Override
+    public String getLogDisableMessage() {
+        return "Audit Log is disabled!!";
+    }
+
+    public boolean shouldFlushOutput(int byteCountIgnored) {
+        // audit flush is done on number of lines written, not byte
+        this.totalDataWritten += 1;
+        if (this.totalDataWritten > getBufferLen()) {
+            this.totalDataWritten = 0;
+            return true;
+        }
+        else {
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/util/XLogErrorStreamer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/XLogErrorStreamer.java b/core/src/main/java/org/apache/oozie/util/XLogErrorStreamer.java
new file mode 100644
index 0000000..3395925
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/util/XLogErrorStreamer.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.oozie.client.rest.RestConstants;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.service.ConfigurationService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.XLogService;
+
+public class XLogErrorStreamer extends XLogStreamer {
+
+    public static final String STREAM_BUFFER_LEN = CONF_PREFIX + "error.buffer.len";
+
+    public XLogErrorStreamer(XLogFilter logFilter, Map<String, String[]> requestParameters) {
+        super(logFilter, Services.get().get(XLogService.class).getOozieErrorLogPath(), Services.get()
+                .get(XLogService.class).getOozieErrorLogName(), Services.get().get(XLogService.class)
+                .getOozieErrorLogRotation());
+        this.requestParam = requestParameters;
+        bufferLen = ConfigurationService.getInt(STREAM_BUFFER_LEN, 2048);
+    }
+
+    public XLogErrorStreamer(Map<String, String[]> requestParameters) throws CommandException {
+        this(new XLogFilter(new XLogUserFilterParam(requestParameters)), requestParameters);
+    }
+
+    @Override
+    protected void calculateAndValidateDateRange(Date startTime, Date endTime) throws IOException {
+        logFilter.calculateAndCheckDates(startTime, endTime);
+        // no validate and truncating
+    }
+
+    @Override
+    public boolean isLogEnabled() {
+        return Services.get().get(XLogService.class).isErrorLogEnabled();
+    }
+
+    @Override
+    public String getLogType() {
+        return RestConstants.JOB_SHOW_ERROR_LOG;
+    }
+
+    @Override
+    public String getLogDisableMessage() {
+        return "Error Log is disabled!!";
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/util/XLogFilter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/XLogFilter.java b/core/src/main/java/org/apache/oozie/util/XLogFilter.java
index cdf172b..505f945 100644
--- a/core/src/main/java/org/apache/oozie/util/XLogFilter.java
+++ b/core/src/main/java/org/apache/oozie/util/XLogFilter.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.oozie.util;
 
 import java.io.IOException;
@@ -56,6 +55,7 @@ public class XLogFilter {
     private boolean isActionList = false;
     private String formattedEndDate;
     private String formattedStartDate;
+    private String truncatedMessage;
 
     // TODO Patterns to be read from config file
     private static final String DEFAULT_REGEX = "[^\\]]*";
@@ -290,8 +290,9 @@ public class XLogFilter {
     }
 
     public String getDebugMessage() {
-        return "Log start time = " + getStartDate() + ". Log end time = " + getEndDate() + ". User Log Filter = "
-                + getUserLogFilter() + System.getProperty("line.separator");
+        return new StringBuilder("Log start time = ").append(getStartDate()).append(". Log end time = ")
+                .append(getEndDate()).append(". User Log Filter = ").append(getUserLogFilter())
+                .append(System.getProperty("line.separator")).toString();
     }
 
     public boolean isActionList() {
@@ -302,7 +303,20 @@ public class XLogFilter {
         this.isActionList = isActionList;
     }
 
-    private void calculateScanDate(Date jobStartTime, Date jobEndTime) throws IOException {
+    /**
+     * Calculate scan date
+     *
+     * @param jobStartTime the job start time
+     * @param jobEndTime the job end time
+     * @throws IOException Signals that an I/O exception has occurred.
+     */
+    public void calculateAndCheckDates(Date jobStartTime, Date jobEndTime) throws IOException {
+
+        // for testcase, otherwise jobStartTime and jobEndTime will be always
+        // set
+        if (jobStartTime == null || jobEndTime == null) {
+            return;
+        }
 
         if (userLogFilter.getStartDate() != null) {
             startDate = userLogFilter.getStartDate();
@@ -311,14 +325,15 @@ public class XLogFilter {
             startDate = adjustOffset(jobStartTime, userLogFilter.getStartOffset());
         }
         else {
-            startDate = jobStartTime;
+            startDate = new Date(jobStartTime.getTime());
         }
 
         if (userLogFilter.getEndDate() != null) {
             endDate = userLogFilter.getEndDate();
         }
         else if (userLogFilter.getEndOffset() != -1) {
-            // If user has specified startdate as absolute then end offset will be on user start date,
+            // If user has specified startdate as absolute then end offset will
+            // be on user start date,
             // else end offset will be calculated on job startdate.
             if (userLogFilter.getStartDate() != null) {
                 endDate = adjustOffset(startDate, userLogFilter.getEndOffset());
@@ -328,14 +343,14 @@ public class XLogFilter {
             }
         }
         else {
-            endDate = jobEndTime;
+            endDate = new Date(jobEndTime.getTime());
         }
         // if recent offset is specified then start time = endtime - offset
         if (getUserLogFilter().getRecent() != -1) {
             startDate = adjustOffset(endDate, userLogFilter.getRecent() * -1);
         }
 
-        //add buffer iff dates are not asbsolute
+        // add buffer if dates are not absolute
         if (userLogFilter.getStartDate() == null) {
             startDate = adjustOffset(startDate, -LOG_TIME_BUFFER);
         }
@@ -345,26 +360,27 @@ public class XLogFilter {
 
         formattedEndDate = XLogUserFilterParam.dt.get().format(getEndDate());
         formattedStartDate = XLogUserFilterParam.dt.get().format(getStartDate());
+
+        if (startDate.after(endDate)) {
+            throw new IOException(
+                    "Start time should be less than end time. startTime = " + startDate + " endTime = " + endDate);
+        }
     }
 
     /**
-     * Calculate and validate date range.
+     * validate date range.
      *
      * @param jobStartTime the job start time
      * @param jobEndTime the job end time
      * @throws IOException Signals that an I/O exception has occurred.
      */
-    public void calculateAndValidateDateRange(Date jobStartTime, Date jobEndTime) throws IOException {
-        // for testcase, otherwise jobStartTime and jobEndTime will be always set
+    public void validateDateRange(Date jobStartTime, Date jobEndTime) throws IOException {
+        // for testcase, otherwise jobStartTime and jobEndTime will be always
+        // set
         if (jobStartTime == null || jobEndTime == null) {
             return;
         }
-        calculateScanDate(jobStartTime, jobEndTime);
 
-        if (startDate.after(endDate)) {
-            throw new IOException("Start time should be less than end time. startTime = " + startDate + " endtime = "
-                    + endDate);
-        }
         long diffHours = (endDate.getTime() - startDate.getTime()) / (60 * 60 * 1000);
         if (isActionList) {
             int actionLogDuration = ConfigurationService.getInt(MAX_ACTIONLIST_SCAN_DURATION);
@@ -372,10 +388,9 @@ public class XLogFilter {
                 return;
             }
             if (diffHours > actionLogDuration) {
-                throw new IOException(
-                        "Request log streaming time range with action list is higher than configured. Please reduce the scan "
-                                + "time range. Input range (hours) = " + diffHours
-                                + " system allowed (hours) with action list = " + actionLogDuration);
+                setTruncatedMessage("Truncated logs to max log scan duration " + actionLogDuration + " hrs");
+                startDate = adjustOffset(endDate, -1 * actionLogDuration * 60);
+                startDate = adjustOffset(startDate, -1 * LOG_TIME_BUFFER);
             }
         }
         else {
@@ -384,14 +399,27 @@ public class XLogFilter {
                 return;
             }
             if (diffHours > logDuration) {
-                throw new IOException(
-                        "Request log streaming time range is higher than configured. Please reduce the scan time range. For coord"
-                                + " jobs you can provide action list to reduce log scan time range. Input range (hours) = "
-                                + diffHours + " system allowed (hours) = " + logDuration);
+                setTruncatedMessage("Truncated logs to max log scan duration " + logDuration + " hrs");
+                startDate = adjustOffset(endDate, -1 * logDuration * 60);
+                startDate = adjustOffset(startDate, -1 * LOG_TIME_BUFFER);
             }
         }
     }
 
+    protected void setTruncatedMessage(String message) {
+        truncatedMessage = message;
+
+    }
+
+    public String getTruncatedMessage() {
+        if (StringUtils.isEmpty(truncatedMessage)) {
+            return truncatedMessage;
+        }
+        else {
+            return truncatedMessage + System.getProperty("line.separator");
+        }
+    }
+
     /**
      * Adjust offset, offset will always be in min.
      *

http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/util/XLogStreamer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/XLogStreamer.java b/core/src/main/java/org/apache/oozie/util/XLogStreamer.java
index 19f1fee..012fbef 100644
--- a/core/src/main/java/org/apache/oozie/util/XLogStreamer.java
+++ b/core/src/main/java/org/apache/oozie/util/XLogStreamer.java
@@ -25,28 +25,64 @@ import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Collections;
 import java.util.Date;
+import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.io.BufferedReader;
 
+import org.apache.commons.lang.StringUtils;
+import org.apache.oozie.client.rest.RestConstants;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.service.ConfigurationService;
+import org.apache.oozie.service.Service;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.XLogService;
+
 /**
  * XLogStreamer streams the given log file to writer after applying the given filter.
  */
 public class XLogStreamer {
     private static XLog LOG = XLog.getLog(XLogStreamer.class);
+    protected static final String CONF_PREFIX = Service.CONF_PREFIX + "XLogStreamingService.";
+    public static final String STREAM_BUFFER_LEN = CONF_PREFIX + "buffer.len";
+
     private String logFile;
     private String logPath;
-    private XLogFilter logFilter;
+    protected XLogFilter logFilter;
     private long logRotation;
+    Map<String, String[]> requestParam;
+    protected int totalDataWritten;
+    protected int bufferLen;
 
     public XLogStreamer(XLogFilter logFilter, String logPath, String logFile, long logRotationSecs) {
-        this.logFilter = logFilter;
         if (logFile == null) {
             logFile = "oozie-app.log";
         }
+
+        this.logFilter = logFilter;
         this.logFile = logFile;
         this.logPath = logPath;
         this.logRotation = logRotationSecs * 1000l;
+        bufferLen = ConfigurationService.getInt(STREAM_BUFFER_LEN, 4096);
+    }
+
+    public XLogStreamer(XLogFilter logFilter) {
+        this(logFilter, Services.get().get(XLogService.class).getOozieLogPath(), Services.get().get(XLogService.class)
+                .getOozieLogName(), Services.get().get(XLogService.class).getOozieLogRotation());
+
+    }
+
+    public XLogStreamer(XLogFilter logFilter, Map<String, String[]> params) {
+        this(logFilter, Services.get().get(XLogService.class).getOozieLogPath(), Services.get().get(XLogService.class)
+                .getOozieLogName(), Services.get().get(XLogService.class).getOozieLogRotation());
+        this.requestParam = params;
+
+    }
+
+
+    public XLogStreamer(Map<String, String[]> params) throws CommandException {
+        this(new XLogFilter(new XLogUserFilterParam(params)));
+        this.requestParam = params;
     }
 
     /**
@@ -58,15 +94,34 @@ public class XLogStreamer {
      * @param endTime
      * @throws IOException
      */
-    public void streamLog(Writer writer, Date startTime, Date endTime, int bufferLen) throws IOException {
+    public void streamLog(Writer writer, Date startTime, Date endTime) throws IOException {
+        streamLog(writer, startTime, endTime, true);
+    }
+
+    /**
+     * Gets the files that are modified between startTime and endTime in the given logPath and streams the log after
+     * applying the filters
+     *
+     * @param writer the writer
+     * @param startTime the start time
+     * @param endTime the end time
+     * @param appendDebug the append debug
+     * @throws IOException Signals that an I/O exception has occurred.
+     */
+    public void streamLog(Writer writer, Date startTime, Date endTime, boolean appendDebug) throws IOException {
         // Get a Reader for the log file(s)
         BufferedReader reader = new BufferedReader(getReader(startTime, endTime));
         try {
-            if(logFilter.isDebugMode()){
-                writer.write(logFilter.getDebugMessage());
+            if (appendDebug) {
+                if (!StringUtils.isEmpty(logFilter.getTruncatedMessage())) {
+                    writer.write(logFilter.getTruncatedMessage());
+                }
+                if (logFilter.isDebugMode()) {
+                    writer.write(logFilter.getDebugMessage());
+                }
             }
             // Process the entire logs from the reader using the logFilter
-            new TimestampedMessageParser(reader, logFilter).processRemaining(writer, bufferLen);
+            new TimestampedMessageParser(reader, logFilter).processRemaining(writer, this);
         }
         finally {
             reader.close();
@@ -83,12 +138,17 @@ public class XLogStreamer {
      */
 
     private MultiFileReader getReader(Date startTime, Date endTime) throws IOException {
-        logFilter.calculateAndValidateDateRange(startTime, endTime);
+        calculateAndValidateDateRange(startTime, endTime);
         return new MultiFileReader(getFileList(logFilter.getStartDate(), logFilter.getEndDate()));
     }
 
+    protected void calculateAndValidateDateRange(Date startTime, Date endTime) throws IOException {
+        logFilter.calculateAndCheckDates(startTime, endTime);
+        logFilter.validateDateRange(startTime, endTime);
+    }
+
     public BufferedReader makeReader(Date startTime, Date endTime) throws IOException {
-        return new BufferedReader(getReader(startTime,endTime));
+        return new BufferedReader(getReader(startTime, endTime));
     }
 
     /**
@@ -220,7 +280,8 @@ public class XLogStreamer {
             LOG.warn("oozie.log has been GZipped, which is unexpected");
             // Return a value other than -1 to include the file in list
             returnVal = 0;
-        } else {
+        }
+        else {
             Matcher m = gzTimePattern.matcher(fileName);
             if (m.matches() && m.groupCount() == 4) {
                 int year = Integer.parseInt(m.group(1));
@@ -242,11 +303,47 @@ public class XLogStreamer {
                         || (startTime <= logFileStartTime && endTime >= logFileEndTime)) {
                     returnVal = logFileStartTime;
                 }
-            } else {
+            }
+            else {
                 LOG.debug("Filename " + fileName + " does not match the expected format");
                 returnVal = -1;
             }
         }
         return returnVal;
     }
+
+    public boolean isLogEnabled() {
+        return Services.get().get(XLogService.class).getLogOverWS();
+    }
+
+    public String getLogType() {
+        return RestConstants.JOB_SHOW_LOG;
+    }
+
+    public XLogFilter getXLogFilter() {
+        return logFilter;
+    }
+
+    public String getLogDisableMessage() {
+        return "Log streaming disabled!!";
+    }
+
+    public Map<String, String[]> getRequestParam() {
+        return requestParam;
+    }
+
+    public boolean shouldFlushOutput(int writtenBytes) {
+        this.totalDataWritten += writtenBytes;
+        if (this.totalDataWritten > getBufferLen()) {
+            this.totalDataWritten = 0;
+            return true;
+        }
+        else {
+            return false;
+        }
+    }
+
+    public int getBufferLen() {
+        return bufferLen;
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index ae384b4..e7a48a0 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -192,7 +192,23 @@
     <property>
         <name>oozie.service.XLogStreamingService.buffer.len</name>
         <value>4096</value>
-        <description>4K buffer for streaming the logs progressively</description>
+        <description>4K buffer for streaming the logs progressively
+        </description>
+    </property>
+    <property>
+        <name>oozie.service.XLogStreamingService.error.buffer.len</name>
+        <value>2048</value>
+        <description>2K buffer for streaming the error logs
+            progressively
+        </description>
+    </property>
+
+    <property>
+        <name>oozie.service.XLogStreamingService.audit.buffer.len</name>
+        <value>3</value>
+        <description>Number of lines for streaming the audit logs
+            progressively
+        </description>
     </property>
 
  <!-- HCatAccessorService -->
@@ -2223,6 +2239,14 @@ will be the requeue interval for the actions which are waiting for a long time w
     </property>
 
     <property>
+        <name>oozie.server.connection.timeout.seconds</name>
+        <value>180</value>
+        <description>
+            Defines connection timeout used for Oozie server communicating to other Oozie server over HTTP(s). Default is 3 min.
+        </description>
+    </property>
+
+    <property>
         <name>oozie.authentication.token.validity</name>
         <value>36000</value>
         <description>

http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java b/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java
index 3eb1016..d6f1aef 100644
--- a/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java
+++ b/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java
@@ -26,13 +26,9 @@ import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.CoordinatorJob;
-import org.apache.oozie.client.Job;
 import org.apache.oozie.client.rest.RestConstants;
-import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
 import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
 import org.apache.oozie.service.DagXLogInfoService;
 import org.apache.oozie.service.Services;
@@ -40,6 +36,7 @@ import org.apache.oozie.service.XLogStreamingService;
 import org.apache.oozie.test.XDataTestCase;
 import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.XLogFilter;
+import org.apache.oozie.util.XLogStreamer;
 
 public class TestCoordinatorEngineStreamLog extends XDataTestCase {
     private Services services;
@@ -63,9 +60,9 @@ public class TestCoordinatorEngineStreamLog extends XDataTestCase {
         Date endTime;
 
         @Override
-        public void streamLog(XLogFilter filter1, Date startTime, Date endTime, Writer writer, Map<String, String[]> params)
+        public void streamLog(XLogStreamer logStreamer, Date startTime, Date endTime, Writer writer)
                 throws IOException {
-            filter = filter1;
+            filter = logStreamer.getXLogFilter();
             this.startTime = startTime;
             this.endTime = endTime;
         }

http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java b/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java
index 2a3d3d3..3c6525d 100644
--- a/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java
@@ -36,6 +36,7 @@ import org.apache.oozie.test.XTestCase;
 import org.apache.oozie.util.ConfigUtils;
 import org.apache.oozie.util.IOUtils;
 import org.apache.oozie.util.XLogFilter;
+import org.apache.oozie.util.XLogStreamer;
 import org.apache.oozie.workflow.lite.LiteWorkflowAppParser;
 
 import java.io.File;
@@ -176,7 +177,7 @@ public class TestConfigurationService extends XTestCase {
         assertEquals("http://0.0.0.0:11000/oozie/callback", ConfigurationService.get(CallbackService.CONF_BASE_URL));
         assertEquals(5, ConfigurationService.getInt(CallbackService.CONF_EARLY_REQUEUE_MAX_RETRIES));
         assertEquals("gz", ConfigurationService.get(CodecFactory.COMPRESSION_OUTPUT_CODEC));
-        assertEquals(4096, ConfigurationService.getInt(XLogStreamingService.STREAM_BUFFER_LEN));
+        assertEquals(4096, ConfigurationService.getInt(XLogStreamer.STREAM_BUFFER_LEN));
         assertEquals(10000,  ConfigurationService.getLong(JvmPauseMonitorService.WARN_THRESHOLD_KEY));
         assertEquals(60, ConfigurationService.getInt(InstrumentationService.CONF_LOGGING_INTERVAL));
         assertEquals(30, ConfigurationService.getInt(PurgeService.CONF_OLDER_THAN));

http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/test/java/org/apache/oozie/service/TestXLogStreamingService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestXLogStreamingService.java b/core/src/test/java/org/apache/oozie/service/TestXLogStreamingService.java
index bebb678..1921f1b 100644
--- a/core/src/test/java/org/apache/oozie/service/TestXLogStreamingService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestXLogStreamingService.java
@@ -23,18 +23,23 @@ import org.apache.oozie.command.CommandException;
 import org.apache.oozie.test.XTestCase;
 import org.apache.oozie.util.XLog;
 import org.apache.oozie.util.XLogAuditFilter;
+import org.apache.oozie.util.XLogAuditStreamer;
+import org.apache.oozie.util.XLogErrorStreamer;
 import org.apache.oozie.util.XLogFilter;
+import org.apache.oozie.util.XLogStreamer;
 import org.apache.oozie.util.XLogUserFilterParam;
 
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.InputStream;
 import java.io.StringWriter;
-import java.util.HashMap;
+import java.util.Date;
 import java.util.Properties;
 
 public class TestXLogStreamingService extends XTestCase {
 
+    final int FIFTEEN_HOURS = 60 * 60 * 1000 * 15;
+
     @Override
     protected void setUp() throws Exception {
         super.setUp();
@@ -386,6 +391,28 @@ public class TestXLogStreamingService extends XTestCase {
         }
     }
 
+    public void testTuncateLog() throws Exception {
+        File log4jFile = new File(getTestCaseConfDir(), "test-log4j.properties");
+        ClassLoader cl = Thread.currentThread().getContextClassLoader();
+        InputStream is = cl.getResourceAsStream("test-no-dash-log4j.properties");
+        Properties log4jProps = new Properties();
+        log4jProps.load(is);
+        log4jProps.setProperty("log4j.appender.oozie.File", getTestCaseDir() + "/oozie.log");
+        log4jProps.store(new FileOutputStream(log4jFile), "");
+        setSystemProperty(XLogService.LOG4J_FILE, log4jFile.getName());
+
+        new Services().init();
+        ConfigurationService.set(XLogFilter.MAX_SCAN_DURATION, "10");
+        Date startDate = new Date();
+        Date endDate = new Date(startDate.getTime() + FIFTEEN_HOURS);
+        String log = doStreamLog(new XLogFilter(), startDate, endDate);
+        assertTrue(log.contains("Truncated logs to max log scan duration"));
+        log = doStreamErrorLog(new XLogFilter(), startDate, endDate);
+        assertFalse(log.contains("Truncated logs to max log scan duration"));
+        log = doStreamAuditLog(new XLogFilter(), startDate, endDate);
+        assertFalse(log.contains("Truncated logs to max log scan duration"));
+    }
+
     private boolean doStreamDisabledCheckWithServices() throws Exception {
         boolean result = false;
         try {
@@ -405,21 +432,35 @@ public class TestXLogStreamingService extends XTestCase {
     }
 
     private String doStreamLog(XLogFilter xf) throws Exception {
-        StringWriter w = new StringWriter();
-        Services.get().get(XLogStreamingService.class).streamLog(xf, null, null, w, new HashMap<String, String[]>());
-        return w.toString();
+        return doStreamLog(new XLogStreamer(xf), null, null);
     }
+
     private String doStreamErrorLog(XLogFilter xf) throws Exception {
-        StringWriter w = new StringWriter();
-        Services.get().get(XLogStreamingService.class).streamErrorLog(xf, null, null, w, new HashMap<String, String[]>());
-        return w.toString();
+        return doStreamLog(new XLogErrorStreamer(xf, null), null, null);
     }
+
     private String doStreamAuditLog(XLogFilter xf) throws Exception {
+        return doStreamLog(new XLogAuditStreamer(xf, null), null, null);
+    }
+
+    private String doStreamLog(XLogStreamer logStreamer,  Date startDate, Date endDate) throws Exception {
         StringWriter w = new StringWriter();
-        Services.get().get(XLogStreamingService.class).streamAuditLog(xf, null, null, w, new HashMap<String, String[]>());
+        Services.get().get(XLogStreamingService.class)
+                .streamLog(logStreamer, startDate, endDate, w);
         return w.toString();
     }
 
+    private String doStreamLog(XLogFilter xf, Date startDate, Date endDate) throws Exception {
+        return doStreamLog(new XLogStreamer(xf, null), startDate, endDate);
+    }
+
+    private String doStreamErrorLog(XLogFilter xf, Date startDate, Date endDate) throws Exception {
+        return doStreamLog(new XLogErrorStreamer(xf, null), null, null);
+    }
+
+    private String doStreamAuditLog(XLogFilter xf, Date startDate, Date endDate) throws Exception {
+        return doStreamLog(new XLogAuditStreamer(xf, null), null, null);
+    }
 
     private boolean doErrorStreamDisabledCheck() throws Exception {
         XLogFilter xf = new XLogFilter(new XLogUserFilterParam(null));

http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java b/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java
index fca8d84..df097fd 100644
--- a/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java
@@ -32,7 +32,9 @@ import org.apache.oozie.client.rest.RestConstants;
 import org.apache.oozie.test.EmbeddedServletContainer;
 import org.apache.oozie.test.ZKXTestCase;
 import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.XLogErrorStreamer;
 import org.apache.oozie.util.XLogFilter;
+import org.apache.oozie.util.XLogStreamer;
 import org.apache.oozie.util.ZKUtils;
 
 public class TestZKXLogStreamingService extends ZKXTestCase {
@@ -198,15 +200,29 @@ public class TestZKXLogStreamingService extends ZKXTestCase {
         return doStreamLog(xf, new HashMap<String, String[]>());
     }
 
+    protected String doStreamLog(XLogFilter xf, Date startTime, Date endTime) throws Exception {
+        return doStreamLog(xf, new HashMap<String, String[]>(), false, startTime, endTime);
+    }
+
     protected String doStreamErrorLog(XLogFilter xf) throws Exception {
         return doStreamLog(xf, new HashMap<String, String[]>(), true);
     }
 
+    private String doStreamErrorLog(XLogFilter xf, Date startDate, Date endDate) throws Exception {
+        return doStreamLog(xf, new HashMap<String, String[]>(), true, startDate, startDate);
+    }
+
+
     protected String doStreamLog(XLogFilter xf, Map<String, String[]> param) throws Exception {
         return doStreamLog(xf, param, false);
     }
 
     protected String doStreamLog(XLogFilter xf, Map<String, String[]> param, boolean isErrorLog) throws Exception {
+        return doStreamLog(xf, param, isErrorLog, null, null);
+    }
+
+    protected String doStreamLog(XLogFilter xf, Map<String, String[]> param, boolean isErrorLog, Date startTime,
+            Date endTime) throws Exception {
         StringWriter w = new StringWriter();
         ZKXLogStreamingService zkxlss = new ZKXLogStreamingService();
         try {
@@ -215,10 +231,10 @@ public class TestZKXLogStreamingService extends ZKXTestCase {
             zkxlss.init(services);
             sleep(1000); // Sleep to allow ZKUtils ServiceCache to update
             if (isErrorLog) {
-                zkxlss.streamErrorLog(xf, null, null, w, param);
+                zkxlss.streamLog(new XLogErrorStreamer(xf, param), startTime, endTime, w);
             }
             else {
-                zkxlss.streamLog(xf, null, null, w, param);
+                zkxlss.streamLog(new XLogStreamer(xf, param), startTime, endTime, w);
             }
         }
         finally {
@@ -583,4 +599,30 @@ public class TestZKXLogStreamingService extends ZKXTestCase {
         }
     }
 
+    public void testTuncateLog() throws Exception {
+        XLogFilter.reset();
+        File log4jFile = new File(getTestCaseConfDir(), "test-log4j.properties");
+        ClassLoader cl = Thread.currentThread().getContextClassLoader();
+        InputStream is = cl.getResourceAsStream("test-no-dash-log4j.properties");
+        Properties log4jProps = new Properties();
+        log4jProps.load(is);
+        // prevent conflicts with other tests by changing the log file location
+        log4jProps.setProperty("log4j.appender.oozie.File", getTestCaseDir() + "/oozie.log");
+        log4jProps.store(new FileOutputStream(log4jFile), "");
+        setSystemProperty(XLogService.LOG4J_FILE, log4jFile.getName());
+        assertFalse(doStreamDisabledCheck());
+        File logFile = new File(Services.get().get(XLogService.class).getOozieLogPath(),
+                                Services.get().get(XLogService.class).getOozieLogName());
+        logFile.getParentFile().mkdirs();
+
+        ConfigurationService.set(XLogFilter.MAX_SCAN_DURATION, "1");
+        Date startDate = new Date();
+        Date endDate = new Date(startDate.getTime() + 60 * 60 * 1000 * 15);
+
+        String log = doStreamLog(new XLogFilter(), startDate, endDate);
+        assertTrue(log.contains("Truncated logs to max log scan duration"));
+        String logError = doStreamErrorLog(new XLogFilter(), startDate, endDate);
+        assertFalse(logError.contains("Truncated logs to max log scan duration"));
+    }
+
 }


[3/3] oozie git commit: OOZIE-2862 Coord change command doesn't change job to running if job was killed without creating any actions

Posted by pu...@apache.org.
OOZIE-2862 Coord change command doesn't change job to running if job was killed without creating any actions


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/cf3b400a
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/cf3b400a
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/cf3b400a

Branch: refs/heads/master
Commit: cf3b400a2a8e66f8689856bfb16b2547526a012a
Parents: 993f06d
Author: puru <pu...@gmail.com>
Authored: Wed Apr 12 14:17:13 2017 -0700
Committer: puru <pu...@gmail.com>
Committed: Wed Apr 12 14:17:13 2017 -0700

----------------------------------------------------------------------
 .../command/coord/CoordChangeXCommand.java      |  4 +-
 .../command/coord/TestCoordChangeXCommand.java  | 40 ++++++++++++++++----
 release-log.txt                                 |  1 +
 3 files changed, 35 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/cf3b400a/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
index e65b74f..ff7bf8a 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
@@ -436,8 +436,8 @@ public class CoordChangeXCommand extends CoordinatorXCommand<Void> {
                 LOG.info("Coord status is changed to " + jobStatus + " from " + prevStatus);
                 if (jobStatus.equals(CoordinatorJob.Status.RUNNING)) {
                     coordJob.setPending();
-                    if (coordJob.getNextMaterializedTime() != null
-                            && coordJob.getEndTime().after(coordJob.getNextMaterializedTime())) {
+                    if (coordJob.getNextMaterializedTime() == null
+                            || coordJob.getEndTime().after(coordJob.getNextMaterializedTime())) {
                         coordJob.resetDoneMaterialization();
                     }
                 } else if (jobStatus.equals(CoordinatorJob.Status.IGNORED)) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/cf3b400a/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
index 8034bbe..c071000 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
@@ -59,6 +59,7 @@ import org.apache.oozie.util.XCallable;
 
 public class TestCoordChangeXCommand extends XDataTestCase {
     private Services services;
+    public static final int HOURS_IN_MS = 60 * 60 * 1000;
 
     /**
      * Return the UTC date and time in W3C format down to second
@@ -538,7 +539,7 @@ public class TestCoordChangeXCommand extends XDataTestCase {
     // Testcase to check status for coord whose enddate is set before startdate.
     public void testCoordChangeEndTimeBeforeStart() throws Exception {
         Date start = new Date();
-        Date end = new Date(start.getTime() + (4 * 60 * 60 * 1000)); // 4 hrs
+        Date end = new Date(start.getTime() + (4 * HOURS_IN_MS)); // 4 hrs
         Date endTime = new Date(start.getTime() - 3000);
 
         String endTimeChangeStr = "endtime=" + DateUtils.formatDateOozieTZ(endTime);
@@ -624,7 +625,7 @@ public class TestCoordChangeXCommand extends XDataTestCase {
         Date startTime = DateUtils.parseDateOozieTZ("2013-08-01T00:00Z");
         Date endTime = DateUtils.parseDateOozieTZ("2013-08-01T04:59Z");
 
-        Date pauseTime = new Date(startTime.getTime() + (3 * 60 * 60 * 1000));  //2 hrs
+        Date pauseTime = new Date(startTime.getTime() + (3 * HOURS_IN_MS));  //2 hrs
         String pauseTimeChangeStr = "pausetime=" + DateUtils.formatDateOozieTZ(pauseTime);
         final CoordinatorJobBean job = addRecordToCoordJobTableForPauseTimeTest(CoordinatorJob.Status.RUNNING, startTime,
                 endTime, endTime, true, false, 4);
@@ -729,7 +730,7 @@ public class TestCoordChangeXCommand extends XDataTestCase {
     public void testChangeTimeDeleteRunning() throws Exception {
         Date startTime = DateUtils.parseDateOozieTZ("2013-08-01T00:00Z");
         Date endTime = DateUtils.parseDateOozieTZ("2013-08-01T04:59Z");
-        Date pauseTime = new Date(startTime.getTime() + (2 * 60 * 60 * 1000)); // 2 hrs
+        Date pauseTime = new Date(startTime.getTime() + (2 * HOURS_IN_MS)); // 2 hrs
         String pauseTimeChangeStr = "pausetime=" + DateUtils.formatDateOozieTZ(pauseTime);
         final CoordinatorJobBean job = addRecordToCoordJobTableForPauseTimeTest(CoordinatorJob.Status.RUNNING,
                 startTime, endTime, endTime, true, false, 4);
@@ -763,7 +764,7 @@ public class TestCoordChangeXCommand extends XDataTestCase {
 
     public void testCoordStatus_Ignored() throws Exception {
         Date start = new Date();
-        Date end = new Date(start.getTime() + (5 * 60 * 60 * 1000)); // 5 hrs
+        Date end = new Date(start.getTime() + (5 * HOURS_IN_MS)); // 5 hrs
         String statusToRUNNING = "status=RUNNING";
         String statusToIGNORED = "status=IGNORED";
         final CoordinatorJobBean job1 = addRecordToCoordJobTableForPauseTimeTest(CoordinatorJob.Status.IGNORED, start,
@@ -808,7 +809,7 @@ public class TestCoordChangeXCommand extends XDataTestCase {
     // Status change from failed- successful
     public void testCoordStatus_Failed() throws Exception {
         Date start = new Date();
-        Date end = new Date(start.getTime() + (5 * 60 * 60 * 1000)); // 5 hrs
+        Date end = new Date(start.getTime() + (5 * HOURS_IN_MS)); // 5 hrs
         String status = "status=RUNNING";
         final CoordinatorJobBean job = addRecordToCoordJobTableForPauseTimeTest(CoordinatorJob.Status.FAILED, start,
                 end, end, true, false, 4);
@@ -837,7 +838,7 @@ public class TestCoordChangeXCommand extends XDataTestCase {
     //  Status change from Killed- successful
     public void testCoordStatus_Killed() throws Exception {
         Date start = new Date();
-        Date end = new Date(start.getTime() + (5 * 60 * 60 * 1000)); // 5 hrs
+        Date end = new Date(start.getTime() + (5 * HOURS_IN_MS)); // 5 hrs
         String status = "status=RUNNING";
         final CoordinatorJobBean job = addRecordToCoordJobTableForPauseTimeTest(CoordinatorJob.Status.KILLED, start,
                 end, end, true, false, 4);
@@ -866,7 +867,7 @@ public class TestCoordChangeXCommand extends XDataTestCase {
     // Check status change from Succeeded-  exception
     public void testCoordStatus_Changefailed() throws Exception {
         Date start = new Date();
-        Date end = new Date(start.getTime() + (4 * 60 * 60 * 1000)); // 5 hrs
+        Date end = new Date(start.getTime() + (4 * HOURS_IN_MS)); // 5 hrs
         String status = "status=RUNNING";
         final CoordinatorJobBean job = addRecordToCoordJobTableForPauseTimeTest(CoordinatorJob.Status.SUCCEEDED, start,
                 end, end, true, false, 4);
@@ -886,7 +887,7 @@ public class TestCoordChangeXCommand extends XDataTestCase {
     // Check status change - with multiple option. Pause can't be applied to killed job, old behavior.
     public void testCoord_throwException() throws Exception {
         Date start = new Date();
-        Date end = new Date(start.getTime() + (4 * 60 * 60 * 1000)); // 4 hrs
+        Date end = new Date(start.getTime() + (4 * HOURS_IN_MS)); // 4 hrs
         String status = "status=RUNNING;pausetime=" + DateUtils.formatDateOozieTZ(end);
         final CoordinatorJobBean job = addRecordToCoordJobTableForPauseTimeTest(CoordinatorJob.Status.KILLED, start,
                 end, end, true, false, 4);
@@ -903,6 +904,29 @@ public class TestCoordChangeXCommand extends XDataTestCase {
             assertTrue(e.getMessage().contains("Cannot change a killed coordinator job"));
         }
     }
+
+    // Check status change - when job is killed with no action created
+    public void testRunningStatusWithNoAction() throws Exception {
+        Date now = new Date();
+        Date start = new Date(now.getTime() - (4 * HOURS_IN_MS)); // 4 hrs
+        Date end = new Date(now.getTime() + (4 * HOURS_IN_MS)); // 4 hrs
+
+        String status = "status=RUNNING";
+        CoordinatorJobBean job = addRecordToCoordJobTableForPauseTimeTest(CoordinatorJob.Status.KILLED, start, end,
+                null, true, false, 0);
+        job = CoordJobQueryExecutor.getInstance().get(CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB, job.getId());
+        assertEquals("KILLED", job.getStatusStr());
+        assertNull(job.getNextMaterializedTime());
+        new CoordChangeXCommand(job.getId(), status).call();
+        job = CoordJobQueryExecutor.getInstance().get(CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB, job.getId());
+        assertEquals("RUNNING", job.getStatusStr());
+        // make sure that action is created
+        new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+        job = CoordJobQueryExecutor.getInstance().get(CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB, job.getId());
+        assertNotNull(job.getNextMaterializedTime());
+        assertEquals(1, job.getLastActionNumber());
+    }
+
     protected CoordinatorJobBean addRecordToCoordJobTableForPauseTimeTest(CoordinatorJob.Status status, Date start,
             Date end, Date lastActionTime, boolean pending, boolean doneMatd, int lastActionNum) throws Exception {
         CoordinatorJobBean coordJob = createCoordJob(status, start, end, pending, doneMatd, lastActionNum);

http://git-wip-us.apache.org/repos/asf/oozie/blob/cf3b400a/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index e141d0e..6fb121e 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.4.0 release (trunk - unreleased)
 
+OOZIE-2862 Coord change command doesn't change job to running if job was killed without creating any actions (puru)
 OOZIE-2815 Oozie not always display job log (puru)
 OOZIE-2860 Improve Jetty logging (andras.piros via pbacsko)
 OOZIE-2457 Oozie log parsing regex consume more than 90% cpu (satishsaley)