You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pu...@apache.org on 2017/04/12 21:17:44 UTC
[2/3] oozie git commit: OOZIE-2815 Oozie not always display job log
OOZIE-2815 Oozie not always display job log
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/993f06df
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/993f06df
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/993f06df
Branch: refs/heads/master
Commit: 993f06df44b05ece89afd85cd662bb325ed5f8a3
Parents: 2237bbd
Author: puru <pu...@gmail.com>
Authored: Wed Apr 12 14:15:36 2017 -0700
Committer: puru <pu...@gmail.com>
Committed: Wed Apr 12 14:15:36 2017 -0700
----------------------------------------------------------------------
.../main/java/org/apache/oozie/BaseEngine.java | 85 +++++++-----
.../java/org/apache/oozie/BundleEngine.java | 50 ++-----
.../org/apache/oozie/CoordinatorEngine.java | 65 ++-------
.../main/java/org/apache/oozie/DagEngine.java | 83 ++---------
.../org/apache/oozie/service/XLogService.java | 12 +-
.../oozie/service/XLogStreamingService.java | 80 +++--------
.../oozie/service/ZKXLogStreamingService.java | 138 +++++--------------
.../org/apache/oozie/servlet/V0JobServlet.java | 3 +-
.../org/apache/oozie/servlet/V1JobServlet.java | 4 +-
.../org/apache/oozie/util/AuthUrlClient.java | 3 +-
.../oozie/util/TimestampedMessageParser.java | 36 +----
.../org/apache/oozie/util/XLogAuditFilter.java | 16 ---
.../apache/oozie/util/XLogAuditStreamer.java | 79 +++++++++++
.../apache/oozie/util/XLogErrorStreamer.java | 67 +++++++++
.../java/org/apache/oozie/util/XLogFilter.java | 76 ++++++----
.../org/apache/oozie/util/XLogStreamer.java | 117 ++++++++++++++--
core/src/main/resources/oozie-default.xml | 26 +++-
.../oozie/TestCoordinatorEngineStreamLog.java | 9 +-
.../oozie/service/TestConfigurationService.java | 3 +-
.../oozie/service/TestXLogStreamingService.java | 57 ++++++--
.../service/TestZKXLogStreamingService.java | 46 ++++++-
.../org/apache/oozie/util/TestLogStreamer.java | 47 ++++++-
.../TestSimplifiedTimestampedMessageParser.java | 15 +-
.../util/TestTimestampedMessageParser.java | 9 +-
.../oozie/util/TestXLogUserFilterParam.java | 34 +----
release-log.txt | 1 +
webapp/src/main/webapp/oozie-console.js | 4 +-
27 files changed, 639 insertions(+), 526 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/BaseEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/BaseEngine.java b/core/src/main/java/org/apache/oozie/BaseEngine.java
index 50df897..2780ec2 100644
--- a/core/src/main/java/org/apache/oozie/BaseEngine.java
+++ b/core/src/main/java/org/apache/oozie/BaseEngine.java
@@ -20,25 +20,22 @@ package org.apache.oozie;
import java.io.IOException;
import java.io.Writer;
-import java.util.Date;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.command.CommandException;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.JMSTopicService;
import org.apache.oozie.service.Services;
-import org.apache.oozie.service.XLogStreamingService;
-import org.apache.oozie.util.XLogFilter;
+import org.apache.oozie.util.XLogAuditStreamer;
+import org.apache.oozie.util.XLogErrorStreamer;
+import org.apache.oozie.util.XLogStreamer;
public abstract class BaseEngine {
public static final String USE_XCOMMAND = "oozie.useXCommand";
- public enum LOG_TYPE {
- LOG, ERROR_LOG, AUDIT_LOG
- }
-
protected String user;
/**
@@ -169,39 +166,60 @@ public abstract class BaseEngine {
*
* @param jobId job Id.
* @param writer writer to stream the log to.
- * @param params additional parameters from the request
+ * @param requestParameters additional parameters from the request
* @throws IOException thrown if the log cannot be streamed.
* @throws BaseEngineException thrown if there is error in getting the Workflow/Coordinator Job Information for
* jobId.
*/
- public abstract void streamLog(String jobId, Writer writer, Map<String, String[]> params)
- throws IOException, BaseEngineException;
+ public void streamLog(String jobId, Writer writer, Map<String, String[]> requestParameters) throws IOException,
+ BaseEngineException {
+ try {
+
+ streamJobLog(new XLogStreamer(requestParameters), jobId, writer);
+ }
+ catch (CommandException e) {
+ throw new IOException(e);
+ }
+ }
/**
* Stream error log of a job.
*
* @param jobId job Id.
* @param writer writer to stream the log to.
- * @param params additional parameters from the request
+ * @param requestParameters additional parameters from the request
* @throws IOException thrown if the log cannot be streamed.
* @throws BaseEngineException thrown if there is error in getting the Workflow/Coordinator Job Information for
* jobId.
*/
- public abstract void streamErrorLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
- BaseEngineException;
- /**
+ public void streamErrorLog(String jobId, Writer writer, Map<String, String[]> requestParameters) throws IOException,
+ BaseEngineException {
+ try {
+
+ streamJobLog(new XLogErrorStreamer(requestParameters), jobId, writer);
+ }
+ catch (CommandException e) {
+ throw new IOException(e);
+ }
+ } /**
* Stream Audit log of a job.
*
* @param jobId job Id.
* @param writer writer to stream the log to.
- * @param params additional parameters from the request
+ * @param requestParameters additional parameters from the request
* @throws IOException thrown if the log cannot be streamed.
* @throws BaseEngineException thrown if there is error in getting the Workflow/Coordinator Job Information for
* jobId.
*/
- public abstract void streamAuditLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
- BaseEngineException;
-
+ public void streamAuditLog(String jobId, Writer writer, Map<String, String[]> requestParameters) throws IOException,
+ BaseEngineException {
+ try {
+ streamJobLog(new XLogAuditStreamer(requestParameters), jobId, writer);
+ }
+ catch (CommandException e) {
+ throw new IOException(e);
+ }
+ }
/**
* Return the workflow Job ID for an external ID.
@@ -299,25 +317,16 @@ public abstract class BaseEngine {
public abstract void changeSLA(String id, String actions, String dates, String childIds, String newParams)
throws BaseEngineException;
- protected void fetchLog(XLogFilter filter, Date startTime, Date endTime, Writer writer,
- Map<String, String[]> params, LOG_TYPE logType) throws IOException {
-
- switch (logType) {
- case LOG:
- Services.get().get(XLogStreamingService.class).streamLog(filter, startTime, endTime, writer, params);
- break;
- case ERROR_LOG:
- Services.get().get(XLogStreamingService.class)
- .streamErrorLog(filter, startTime, endTime, writer, params);
- break;
- case AUDIT_LOG:
- Services.get().get(XLogStreamingService.class)
- .streamAuditLog(filter, startTime, endTime, writer, params);
- break;
- default:
- throw new IOException("Unsupported log Type");
- }
- }
-
+ /**
+ * Stream job log.
+ *
+ * @param xLogStreamer the log streamer
+ * @param jobId the job id
+ * @param writer the writer
+ * @throws IOException Signals that an I/O exception has occurred.
+ * @throws BaseEngineException the base engine exception
+ */
+ protected abstract void streamJobLog(XLogStreamer logStreamer, String jobId, Writer writer) throws IOException,
+ BaseEngineException;
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/BundleEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/BundleEngine.java b/core/src/main/java/org/apache/oozie/BundleEngine.java
index d0099b4..e994bff 100644
--- a/core/src/main/java/org/apache/oozie/BundleEngine.java
+++ b/core/src/main/java/org/apache/oozie/BundleEngine.java
@@ -54,14 +54,14 @@ import org.apache.oozie.command.bundle.BundleSubmitXCommand;
import org.apache.oozie.executor.jpa.BundleJobQueryExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.DagXLogInfoService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.XLogStreamingService;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.JobUtils;
import org.apache.oozie.util.JobsFilterUtils;
import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.XLog;
-import org.apache.oozie.util.XLogAuditFilter;
-import org.apache.oozie.util.XLogFilter;
-import org.apache.oozie.util.XLogUserFilterParam;
+import org.apache.oozie.util.XLogStreamer;
import com.google.common.annotations.VisibleForTesting;
@@ -245,44 +245,11 @@ public class BundleEngine extends BaseEngine {
}
}
- @Override
- public void streamLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
- BundleEngineException {
- streamJobLog(jobId, writer, params, LOG_TYPE.LOG);
- }
-
- @Override
- public void streamErrorLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
+ protected void streamJobLog(XLogStreamer logStreamer, String jobId, Writer writer) throws IOException,
BundleEngineException {
- streamJobLog(jobId, writer, params, LOG_TYPE.ERROR_LOG);
- }
-
- @Override
- public void streamAuditLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
- BundleEngineException {
- try {
- streamJobLog(new XLogAuditFilter(new XLogUserFilterParam(params)), jobId, writer, params, LOG_TYPE.AUDIT_LOG);
- }
- catch (CommandException e) {
- throw new IOException(e);
- }
- }
-
- private void streamJobLog(String jobId, Writer writer, Map<String, String[]> params, LOG_TYPE logType)
- throws IOException, BundleEngineException {
- try {
- streamJobLog(new XLogFilter(new XLogUserFilterParam(params)), jobId, writer, params, logType);
- }
- catch (Exception e) {
- throw new IOException(e);
- }
- }
-
- private void streamJobLog(XLogFilter filter, String jobId, Writer writer, Map<String, String[]> params, LOG_TYPE logType)
- throws IOException, BundleEngineException {
- try {
+ try {
BundleJobBean job;
- filter.setParameter(DagXLogInfoService.JOB, jobId);
+ logStreamer.getXLogFilter().setParameter(DagXLogInfoService.JOB, jobId);
job = new BundleJobXCommand(jobId).call();
Date lastTime = null;
if (job.isTerminalStatus()) {
@@ -291,9 +258,10 @@ public class BundleEngine extends BaseEngine {
if (lastTime == null) {
lastTime = new Date();
}
- fetchLog(filter, job.getCreatedTime(), lastTime, writer, params, logType);
+ Services.get().get(XLogStreamingService.class)
+ .streamLog(logStreamer, job.getCreatedTime(), lastTime, writer);
}
- catch (Exception ex) {
+ catch (CommandException ex) {
throw new IOException(ex);
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
index 2f9f822..2c04bea 100644
--- a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
+++ b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
@@ -76,8 +76,8 @@ import org.apache.oozie.util.JobUtils;
import org.apache.oozie.util.Pair;
import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.XLog;
-import org.apache.oozie.util.XLogAuditFilter;
import org.apache.oozie.util.XLogFilter;
+import org.apache.oozie.util.XLogStreamer;
import org.apache.oozie.util.XLogUserFilterParam;
import com.google.common.annotations.VisibleForTesting;
@@ -303,56 +303,20 @@ public class CoordinatorEngine extends BaseEngine {
throw new BaseEngineException(new XException(ErrorCode.E0301, "invalid use of start"));
}
- @Override
- public void streamLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
- BaseEngineException {
- streamJobLog(jobId, writer, params, LOG_TYPE.LOG);
- }
-
- @Override
- public void streamErrorLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
- BaseEngineException {
- streamJobLog(jobId, writer, params, LOG_TYPE.ERROR_LOG);
- }
@Override
- public void streamAuditLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
- BaseEngineException {
- try {
- streamJobLog(new XLogAuditFilter(new XLogUserFilterParam(params)), jobId, writer, params, LOG_TYPE.AUDIT_LOG);
- }
- catch (CommandException e) {
- throw new IOException(e);
- }
- }
-
- private void streamJobLog(String jobId, Writer writer, Map<String, String[]> params, LOG_TYPE logType)
+ protected void streamJobLog(XLogStreamer logStreamer, String jobId, Writer writer)
throws IOException, BaseEngineException {
- try {
- streamJobLog(new XLogFilter(new XLogUserFilterParam(params)), jobId, writer, params, logType);
- }
- catch (Exception e) {
- throw new IOException(e);
- }
- }
-
- private void streamJobLog(XLogFilter filter, String jobId, Writer writer, Map<String, String[]> params, LOG_TYPE logType)
- throws IOException, BaseEngineException {
- try {
- filter.setParameter(DagXLogInfoService.JOB, jobId);
- Date lastTime = null;
- CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
- if (job.isTerminalStatus()) {
- lastTime = job.getLastModifiedTime();
- }
- if (lastTime == null) {
- lastTime = new Date();
- }
- fetchLog(filter, job.getCreatedTime(), lastTime, writer, params, logType);
+ logStreamer.getXLogFilter().setParameter(DagXLogInfoService.JOB, jobId);
+ Date lastTime = null;
+ CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
+ if (job.isTerminalStatus()) {
+ lastTime = job.getLastModifiedTime();
}
- catch (Exception e) {
- throw new IOException(e);
+ if (lastTime == null) {
+ lastTime = new Date();
}
+ Services.get().get(XLogStreamingService.class).streamLog(logStreamer, job.getCreatedTime(), lastTime, writer);
}
/**
@@ -362,17 +326,17 @@ public class CoordinatorEngine extends BaseEngine {
* @param logRetrievalScope Value for the retrieval type
* @param logRetrievalType Based on which filter criteria the log is retrieved
* @param writer writer to stream the log to
- * @param params additional parameters from the request
+ * @param requestParameters additional parameters from the request
* @throws IOException
* @throws BaseEngineException
* @throws CommandException
*/
public void streamLog(String jobId, String logRetrievalScope, String logRetrievalType, Writer writer,
- Map<String, String[]> params) throws IOException, BaseEngineException, CommandException {
+ Map<String, String[]> requestParameters) throws IOException, BaseEngineException, CommandException {
Date startTime = null;
Date endTime = null;
- XLogFilter filter = new XLogFilter(new XLogUserFilterParam(params));
+ XLogFilter filter = new XLogFilter(new XLogUserFilterParam(requestParameters));
filter.setParameter(DagXLogInfoService.JOB, jobId);
if (logRetrievalScope != null && logRetrievalType != null) {
@@ -528,7 +492,8 @@ public class CoordinatorEngine extends BaseEngine {
}
}
}
- Services.get().get(XLogStreamingService.class).streamLog(filter, startTime, endTime, writer, params);
+ Services.get().get(XLogStreamingService.class).streamLog(new XLogStreamer(filter, requestParameters), startTime,
+ endTime, writer);
}
/*
http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/DagEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/DagEngine.java b/core/src/main/java/org/apache/oozie/DagEngine.java
index 57d2761..36198b5 100644
--- a/core/src/main/java/org/apache/oozie/DagEngine.java
+++ b/core/src/main/java/org/apache/oozie/DagEngine.java
@@ -63,13 +63,12 @@ import org.apache.oozie.service.CallableQueueService;
import org.apache.oozie.service.DagXLogInfoService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.XLogService;
+import org.apache.oozie.service.XLogStreamingService;
import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.XCallable;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XLog;
-import org.apache.oozie.util.XLogAuditFilter;
-import org.apache.oozie.util.XLogFilter;
-import org.apache.oozie.util.XLogUserFilterParam;
+import org.apache.oozie.util.XLogStreamer;
/**
* The DagEngine provides all the DAG engine functionality for WS calls.
@@ -389,80 +388,16 @@ public class DagEngine extends BaseEngine {
}
}
- /**
- * Stream the log of a job.
- *
- * @param jobId job Id.
- * @param writer writer to stream the log to.
- * @param params additional parameters from the request
- * @throws IOException thrown if the log cannot be streamed.
- * @throws DagEngineException thrown if there is error in getting the Workflow Information for jobId.
- */
@Override
- public void streamLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
- DagEngineException {
- streamJobLog(jobId, writer, params, LOG_TYPE.LOG);
- }
-
- /**
- * Stream the error log of a job.
- *
- * @param jobId job Id.
- * @param writer writer to stream the log to.
- * @param params additional parameters from the request
- * @throws IOException thrown if the log cannot be streamed.
- * @throws DagEngineException thrown if there is error in getting the Workflow Information for jobId.
- */
- @Override
- public void streamErrorLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
- DagEngineException {
- streamJobLog(jobId, writer, params, LOG_TYPE.ERROR_LOG);
- }
-
- /**
- * Stream the audit log of a job.
- *
- * @param jobId job Id.
- * @param writer writer to stream the log to.
- * @param params additional parameters from the request
- * @throws IOException thrown if the log cannot be streamed.
- * @throws DagEngineException thrown if there is error in getting the Workflow Information for jobId.
- */
- @Override
- public void streamAuditLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException,
- DagEngineException {
- try {
- streamJobLog(new XLogAuditFilter(new XLogUserFilterParam(params)),jobId, writer, params, LOG_TYPE.AUDIT_LOG);
- }
- catch (CommandException e) {
- throw new IOException(e);
- }
- }
-
- private void streamJobLog(String jobId, Writer writer, Map<String, String[]> params, LOG_TYPE logType)
- throws IOException, DagEngineException {
- try {
- streamJobLog(new XLogFilter(new XLogUserFilterParam(params)), jobId, writer, params, logType);
- }
- catch (Exception e) {
- throw new IOException(e);
- }
- }
-
- private void streamJobLog(XLogFilter filter, String jobId, Writer writer, Map<String, String[]> params, LOG_TYPE logType)
+ protected void streamJobLog(XLogStreamer logStreamer, String jobId, Writer writer)
throws IOException, DagEngineException {
- try {
- filter.setParameter(DagXLogInfoService.JOB, jobId);
- WorkflowJob job = getJob(jobId);
- Date lastTime = job.getEndTime();
- if (lastTime == null) {
- lastTime = job.getLastModifiedTime();
- }
- fetchLog(filter, job.getCreatedTime(), lastTime, writer, params, logType);
- }
- catch (Exception e) {
- throw new IOException(e);
+ logStreamer.getXLogFilter().setParameter(DagXLogInfoService.JOB, jobId);
+ WorkflowJob job = getJob(jobId);
+ Date lastTime = job.getEndTime();
+ if (lastTime == null) {
+ lastTime = job.getLastModifiedTime();
}
+ Services.get().get(XLogStreamingService.class).streamLog(logStreamer, job.getCreatedTime(), lastTime, writer);
}
private static final Set<String> FILTER_NAMES = new HashSet<String>();
http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/service/XLogService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/XLogService.java b/core/src/main/java/org/apache/oozie/service/XLogService.java
index 04f04f4..0b43722 100644
--- a/core/src/main/java/org/apache/oozie/service/XLogService.java
+++ b/core/src/main/java/org/apache/oozie/service/XLogService.java
@@ -295,23 +295,23 @@ public class XLogService implements Service, Instrumentable {
});
}
- boolean getLogOverWS() {
+ public boolean getLogOverWS() {
return logOverWS;
}
- boolean isErrorLogEnabled(){
+ public boolean isErrorLogEnabled(){
return errorLogEnabled;
}
- int getOozieLogRotation() {
+ public int getOozieLogRotation() {
return oozieLogRotation;
}
- int getOozieErrorLogRotation() {
+ public int getOozieErrorLogRotation() {
return oozieErrorLogRotation;
}
- int getOozieAuditLogRotation() {
+ public int getOozieAuditLogRotation() {
return oozieAuditLogRotation;
}
@@ -323,7 +323,7 @@ public class XLogService implements Service, Instrumentable {
return oozieAuditLogName;
}
- boolean isAuditLogEnabled() {
+ public boolean isAuditLogEnabled() {
return auditLogEnabled;
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/service/XLogStreamingService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/XLogStreamingService.java b/core/src/main/java/org/apache/oozie/service/XLogStreamingService.java
index c15c4c1..40ba46e 100644
--- a/core/src/main/java/org/apache/oozie/service/XLogStreamingService.java
+++ b/core/src/main/java/org/apache/oozie/service/XLogStreamingService.java
@@ -18,24 +18,19 @@
package org.apache.oozie.service;
-import org.apache.oozie.util.XLogFilter;
+import org.apache.commons.lang.StringUtils;
import org.apache.oozie.util.Instrumentable;
import org.apache.oozie.util.Instrumentation;
import org.apache.oozie.util.XLogStreamer;
-
import java.io.IOException;
import java.io.Writer;
-import java.util.Map;
import java.util.Date;
/**
* Service that performs streaming of log files over Web Services if enabled in XLogService
*/
public class XLogStreamingService implements Service, Instrumentable {
- private static final String CONF_PREFIX = Service.CONF_PREFIX + "XLogStreamingService.";
- public static final String STREAM_BUFFER_LEN = CONF_PREFIX + "buffer.len";
- protected int bufferLen;
/**
* Initialize the log streaming service.
@@ -44,7 +39,6 @@ public class XLogStreamingService implements Service, Instrumentable {
* @throws ServiceException thrown if the log streaming service could not be initialized.
*/
public void init(Services services) throws ServiceException {
- bufferLen = ConfigurationService.getInt(services.getConf(), STREAM_BUFFER_LEN);
}
/**
@@ -71,75 +65,37 @@ public class XLogStreamingService implements Service, Instrumentable {
// nothing to instrument
}
- /**
- * Stream the log of a job.
- *
- * @param filter log streamer filter.
- * @param startTime start time for log events to filter.
- * @param endTime end time for log events to filter.
- * @param writer writer to stream the log to.
- * @param params additional parameters from the request
- * @throws IOException thrown if the log cannot be streamed.
- */
- public void streamLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params)
- throws IOException {
- XLogService xLogService = Services.get().get(XLogService.class);
- if (xLogService.getLogOverWS()) {
- new XLogStreamer(filter, xLogService.getOozieLogPath(), xLogService.getOozieLogName(),
- xLogService.getOozieLogRotation()).streamLog(writer, startTime, endTime, bufferLen);
- }
- else {
- writer.write("Log streaming disabled!!");
- }
- }
/**
- * Stream the error log of a job.
+ * Stream the log of a job.
*
+ * @param logStreamer the log streamer
* @param filter log streamer filter.
* @param startTime start time for log events to filter.
* @param endTime end time for log events to filter.
* @param writer writer to stream the log to.
- * @param params additional parameters from the request
- * @throws IOException thrown if the log cannot be streamed.
+ * @throws IOException Signals that an I/O exception has occurred.
*/
- public void streamErrorLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params)
- throws IOException {
- XLogService xLogService = Services.get().get(XLogService.class);
- if (xLogService.isErrorLogEnabled()) {
- new XLogStreamer(filter, xLogService.getOozieErrorLogPath(), xLogService.getOozieErrorLogName(),
- xLogService.getOozieErrorLogRotation()).streamLog(writer, startTime, endTime, bufferLen);
- }
- else {
- writer.write("Error Log is disabled!!");
- }
+ public void streamLog(XLogStreamer logStreamer, Date startTime, Date endTime, Writer writer) throws IOException {
+ streamLog(logStreamer, startTime, endTime, writer, true);
}
/**
- * Stream the audit log of a job.
+ * Stream log.
*
- * @param filter log streamer filter.
- * @param startTime start time for log events to filter.
- * @param endTime end time for log events to filter.
- * @param writer writer to stream the log to.
- * @param params additional parameters from the request
- * @throws IOException thrown if the log cannot be streamed.
+ * @param logStreamer the log streamer
+ * @param startTime the start time
+ * @param endTime the end time
+ * @param writer the writer
+ * @param appendDebug the append debug
+ * @throws IOException Signals that an I/O exception has occurred.
*/
- public void streamAuditLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params)
+ protected void streamLog(XLogStreamer logStreamer, Date startTime, Date endTime, Writer writer, boolean appendDebug)
throws IOException {
- XLogService xLogService = Services.get().get(XLogService.class);
- if (xLogService.isAuditLogEnabled()) {
- new XLogStreamer(filter, xLogService.getOozieAuditLogPath(), xLogService.getOozieAuditLogName(),
- xLogService.getOozieAuditLogRotation()).streamLog(writer, startTime, endTime, bufferLen);
- }
- else {
- writer.write("Audit Log is disabled!!");
+ if (!logStreamer.isLogEnabled()) {
+ writer.write(logStreamer.getLogDisableMessage());
+ return;
}
- }
-
-
-
- public int getBufferLen() {
- return bufferLen;
+ logStreamer.streamLog(writer, startTime, endTime, appendDebug);
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java b/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
index 97771ad..3a5081c 100644
--- a/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
+++ b/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import org.apache.commons.lang.StringUtils;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.OozieClient;
@@ -37,7 +38,6 @@ import org.apache.oozie.util.Instrumentation;
import org.apache.oozie.util.SimpleTimestampedMessageParser;
import org.apache.oozie.util.TimestampedMessageParser;
import org.apache.oozie.util.XLog;
-import org.apache.oozie.util.XLogFilter;
import org.apache.oozie.util.XLogStreamer;
import org.apache.oozie.util.ZKUtils;
@@ -93,119 +93,42 @@ public class ZKXLogStreamingService extends XLogStreamingService implements Serv
/**
* Stream the log of a job. It contacts any other running Oozie servers to collate relevant logs while streaming.
*
- * @param filter log streamer filter.
+ * @param logStreamer the log streamer
* @param startTime start time for log events to filter.
* @param endTime end time for log events to filter.
* @param writer writer to stream the log to.
- * @param params additional parameters from the request
* @throws IOException thrown if the log cannot be streamed.
*/
@Override
- public void streamLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params)
- throws IOException {
- XLogService xLogService = Services.get().get(XLogService.class);
- if (xLogService.getLogOverWS()) {
- // If ALL_SERVERS_PARAM is set to false, then only stream our log
- if (!Services.get().get(JobsConcurrencyService.class).isAllServerRequest(params)) {
- new XLogStreamer(filter, xLogService.getOozieLogPath(), xLogService.getOozieLogName(),
- xLogService.getOozieLogRotation()).streamLog(writer, startTime, endTime, bufferLen);
- }
- // Otherwise, we have to go collate relevant logs from the other Oozie servers
- else {
- collateLogs(filter, startTime, endTime, writer, params, xLogService.getOozieLogPath(),
- xLogService.getOozieLogName(), xLogService.getOozieLogRotation(), RestConstants.JOB_SHOW_LOG);
- }
- }
- else {
- writer.write("Log streaming disabled!!");
- }
- }
+ public void streamLog(XLogStreamer logStreamer, Date startTime, Date endTime, Writer writer) throws IOException {
- /**
- * Stream the error log of a job. It contacts any other running Oozie servers to collate relevant error logs while streaming.
- *
- * @param filter log streamer filter.
- * @param startTime start time for log events to filter.
- * @param endTime end time for log events to filter.
- * @param writer writer to stream the log to.
- * @param params additional parameters from the request
- * @throws IOException thrown if the log cannot be streamed.
- */
- public void streamErrorLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params)
- throws IOException {
- XLogService xLogService = Services.get().get(XLogService.class);
- if (xLogService.isErrorLogEnabled()) {
- // If ALL_SERVERS_PARAM is set to false, then only stream our log
- if (!Services.get().get(JobsConcurrencyService.class).isAllServerRequest(params)) {
- new XLogStreamer(filter, xLogService.getOozieErrorLogPath(), xLogService.getOozieErrorLogName(),
- xLogService.getOozieErrorLogRotation()).streamLog(writer, startTime, endTime, bufferLen);
- }
- // Otherwise, we have to go collate relevant logs from the other Oozie servers
- else {
- collateLogs(filter, startTime, endTime, writer, params, xLogService.getOozieLogPath(),
- xLogService.getOozieErrorLogName(), xLogService.getOozieErrorLogRotation(),
- RestConstants.JOB_SHOW_ERROR_LOG);
- }
- }
- else {
- writer.write("Error Log streaming disabled!!");
+ if (!logStreamer.isLogEnabled()) {
+ writer.write(logStreamer.getLogDisableMessage());
+ return;
}
- }
-
- /**
- * Stream the audit log of a job. It contacts any other running Oozie servers to collate relevant audit logs while streaming.
- *
- * @param filter log streamer filter.
- * @param startTime start time for log events to filter.
- * @param endTime end time for log events to filter.
- * @param writer writer to stream the log to.
- * @param params additional parameters from the request
- * @throws IOException thrown if the log cannot be streamed.
- */
- @Override
- public void streamAuditLog(XLogFilter filter, Date startTime, Date endTime, Writer writer, Map<String, String[]> params)
- throws IOException {
- XLogService xLogService = Services.get().get(XLogService.class);
- if (xLogService.isAuditLogEnabled()) {
- // If ALL_SERVERS_PARAM is set to false, then only stream our log
- if (!Services.get().get(JobsConcurrencyService.class).isAllServerRequest(params)) {
- new XLogStreamer(filter, xLogService.getOozieAuditLogPath(), xLogService.getOozieAuditLogName(),
- xLogService.getOozieAuditLogRotation()).streamLog(writer, startTime, endTime, bufferLen);
- }
- // Otherwise, we have to go collate relevant logs from the other Oozie servers
- else {
- collateLogs(filter, startTime, endTime, writer, params, xLogService.getOozieAuditLogPath(),
- xLogService.getOozieAuditLogName(), xLogService.getOozieAuditLogRotation(),
- RestConstants.JOB_SHOW_AUDIT_LOG);
- }
+ // If ALL_SERVERS_PARAM is set to false, then only stream our log
+ if (!Services.get().get(JobsConcurrencyService.class).isAllServerRequest(logStreamer.getRequestParam())) {
+ super.streamLog(logStreamer, startTime, endTime, writer, false);
}
+ // Otherwise, we have to go collate relevant logs from the other Oozie servers
else {
- writer.write("Audit Log streaming disabled!!");
+ collateLogs(logStreamer, startTime, endTime, writer);
}
}
-
-
/**
* Contacts each of the other Oozie servers, gets their logs for the job, collates them, and sends them to the user via the
* Writer. It will make sure to not read all of the log messages into memory at the same time to not use up the heap. If there
* is a problem talking to one of the other servers, it will ignore that server and prepend a message to the Writer about it.
* For getting the logs from this server, it won't use the REST API and instead get them directly to be more efficient.
*
- * @param filter the job filter
+ * @param logStreamer the XLogStreamer
* @param startTime the job start time
* @param endTime the job end time
* @param writer the writer
- * @param params the params
- * @param logPath the log path
- * @param logName the log name
- * @param rotation the rotation
- * @param logType the log type
* @throws IOException Signals that an I/O exception has occurred.
*/
- private void collateLogs(XLogFilter filter, Date startTime, Date endTime, Writer writer,
- Map<String, String[]> params, String logPath, String logName, int rotation, final String logType) throws IOException {
- XLogService xLogService = Services.get().get(XLogService.class);
+ private void collateLogs(XLogStreamer logStreamer, Date startTime, Date endTime, Writer writer) throws IOException {
List<String> badOozies = new ArrayList<String>();
List<ServiceInstance<Map>> oozies = null;
try {
@@ -222,24 +145,28 @@ public class ZKXLogStreamingService extends XLogStreamingService implements Serv
String otherId = oozieMeta.get(ZKUtils.ZKMetadataKeys.OOZIE_ID);
// If it's this server, we can just get them directly
if (otherId.equals(zk.getZKId())) {
- BufferedReader reader = new XLogStreamer(filter, logPath, logName, rotation).makeReader(startTime,
+ BufferedReader reader = logStreamer.makeReader(startTime,
endTime);
- parsers.add(new TimestampedMessageParser(reader, filter));
+ parsers.add(new TimestampedMessageParser(reader, logStreamer.getXLogFilter()));
}
// If it's another server, we'll have to use the REST API
else {
String otherUrl = oozieMeta.get(ZKUtils.ZKMetadataKeys.OOZIE_URL);
- String jobId = filter.getFilterParams().get(DagXLogInfoService.JOB);
+ String jobId = logStreamer.getXLogFilter().getFilterParams().get(DagXLogInfoService.JOB);
try {
// It's important that we specify ALL_SERVERS_PARAM=false in the GET request to prevent the other Oozie
// Server from trying aggregate logs from the other Oozie servers (and creating an infinite recursion)
final String url = otherUrl + "/v" + OozieClient.WS_PROTOCOL_VERSION + "/" + RestConstants.JOB
- + "/" + jobId + "?" + RestConstants.JOB_SHOW_PARAM + "=" + logType
- + "&" + RestConstants.ALL_SERVER_REQUEST + "=false" + AuthUrlClient.getQueryParamString(params);
+ + "/" + jobId + "?" + RestConstants.JOB_SHOW_PARAM + "=" + logStreamer.getLogType()
+ + "&" + RestConstants.ALL_SERVER_REQUEST + "=false"
+ + AuthUrlClient.getQueryParamString(logStreamer.getRequestParam());
// remove doAs from url to avoid failure while fetching
// logs in case of HA mode
String key = "doAs";
- String[] value = params.get(key);
+ String[] value = null;
+ if (logStreamer.getRequestParam() != null) {
+ value = logStreamer.getRequestParam().get(key);
+ }
String urlWithoutdoAs = null;
if (value != null && value.length > 0 && value[0] != null && value[0].length() > 0) {
urlWithoutdoAs = url.replace("&" + key + "=" + URLEncoder.encode(value[0], "UTF-8"), "");
@@ -248,7 +175,7 @@ public class ZKXLogStreamingService extends XLogStreamingService implements Serv
urlWithoutdoAs = url;
}
BufferedReader reader = AuthUrlClient.callServer(urlWithoutdoAs);
- parsers.add(new SimpleTimestampedMessageParser(reader, filter));
+ parsers.add(new SimpleTimestampedMessageParser(reader, logStreamer.getXLogFilter()));
}
catch(IOException ioe) {
log.warn("Failed to retrieve logs for job [" + jobId + "] from Oozie server with ID [" + otherId
@@ -259,10 +186,13 @@ public class ZKXLogStreamingService extends XLogStreamingService implements Serv
}
//If log param debug is set, we need to write start date and end date to outputstream.
- if(filter.isDebugMode()){
- writer.write(filter.getDebugMessage());
+ if(!StringUtils.isEmpty(logStreamer.getXLogFilter().getTruncatedMessage())){
+ writer.write(logStreamer.getXLogFilter().getTruncatedMessage());
}
+ if (logStreamer.getXLogFilter().isDebugMode()) {
+ writer.write(logStreamer.getXLogFilter().getDebugMessage());
+ }
// Add a message about any servers we couldn't contact
if (!badOozies.isEmpty()) {
writer.write("Unable to contact the following Oozie Servers for logs (log information may be incomplete):\n");
@@ -278,7 +208,7 @@ public class ZKXLogStreamingService extends XLogStreamingService implements Serv
// If it's just the one server (this server), then we don't need to do any more processing and can just copy it directly
if (parsers.size() == 1) {
TimestampedMessageParser parser = parsers.get(0);
- parser.processRemaining(writer, bufferLen);
+ parser.processRemaining(writer, logStreamer);
}
else {
// Now that we have a Reader for each server to get the logs from that server, we have to collate them. Within each
@@ -292,16 +222,13 @@ public class ZKXLogStreamingService extends XLogStreamingService implements Serv
timestampMap.put(parser.getLastTimestamp(), parser);
}
}
- int bytesWritten = 0;
while (timestampMap.size() > 1) {
// The first entry will be the earliest based on the timestamp (also removes it) from the map
TimestampedMessageParser earliestParser = timestampMap.pollFirstEntry().getValue();
// Write the message from that parser at that timestamp
writer.write(earliestParser.getLastMessage());
- bytesWritten = earliestParser.getLastMessage().length();
- if (bytesWritten > bufferLen) {
+ if (logStreamer.shouldFlushOutput(earliestParser.getLastMessage().length())) {
writer.flush();
- bytesWritten = 0;
}
// Increment that parser to read the next message
if (earliestParser.increment()) {
@@ -313,7 +240,7 @@ public class ZKXLogStreamingService extends XLogStreamingService implements Serv
if (timestampMap.size() == 1) {
TimestampedMessageParser parser = timestampMap.values().iterator().next();
writer.write(parser.getLastMessage()); // don't forget the last message read by the parser
- parser.processRemaining(writer, bufferLen, bytesWritten + parser.getLastMessage().length());
+ parser.processRemaining(writer, logStreamer);
}
}
}
@@ -321,7 +248,6 @@ public class ZKXLogStreamingService extends XLogStreamingService implements Serv
for (TimestampedMessageParser parser : parsers) {
parser.closeReader();
}
- writer.flush();
}
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java b/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java
index d3b4689..39c7b85 100644
--- a/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java
+++ b/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java
@@ -24,6 +24,7 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.BaseEngineException;
import org.apache.oozie.DagEngine;
import org.apache.oozie.DagEngineException;
import org.apache.oozie.client.rest.JsonBean;
@@ -192,7 +193,7 @@ public class V0JobServlet extends BaseJobServlet {
try {
dagEngine.streamLog(jobId, response.getWriter(), request.getParameterMap());
}
- catch (DagEngineException ex) {
+ catch (BaseEngineException ex) {
throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java b/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java
index 9356768..10812c6 100644
--- a/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java
+++ b/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java
@@ -990,7 +990,7 @@ public class V1JobServlet extends BaseJobServlet {
try {
dagEngine.streamLog(jobId, response.getWriter(), request.getParameterMap());
}
- catch (DagEngineException ex) {
+ catch (BaseEngineException ex) {
throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
}
}
@@ -1009,7 +1009,7 @@ public class V1JobServlet extends BaseJobServlet {
try {
bundleEngine.streamLog(jobId, response.getWriter(), request.getParameterMap());
}
- catch (BundleEngineException ex) {
+ catch (BaseEngineException ex) {
throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/util/AuthUrlClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/AuthUrlClient.java b/core/src/main/java/org/apache/oozie/util/AuthUrlClient.java
index b45a96a..4fc8f57 100644
--- a/core/src/main/java/org/apache/oozie/util/AuthUrlClient.java
+++ b/core/src/main/java/org/apache/oozie/util/AuthUrlClient.java
@@ -37,11 +37,11 @@ import org.apache.hadoop.security.authentication.client.Authenticator;
import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
import org.apache.hadoop.security.authentication.client.PseudoAuthenticator;
import org.apache.oozie.service.ConfigurationService;
-import org.apache.oozie.service.Services;
public class AuthUrlClient {
public static final String SERVER_SERVER_AUTH_TYPE = "oozie.server.authentication.type";
+ public static final String SERVER_SERVER_CONNECTION_TIMEOUT_SECONDS = "oozie.server.connection.timeout.seconds";
private static XLog LOG = XLog.getLog(AuthUrlClient.class);
@@ -129,6 +129,7 @@ public class AuthUrlClient {
@Override
public BufferedReader run() throws IOException {
HttpURLConnection conn = getConnection(url);
+ conn.setConnectTimeout(ConfigurationService.getInt(SERVER_SERVER_CONNECTION_TIMEOUT_SECONDS, 180));
BufferedReader reader = null;
if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
InputStream is = conn.getInputStream();
http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/util/TimestampedMessageParser.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/TimestampedMessageParser.java b/core/src/main/java/org/apache/oozie/util/TimestampedMessageParser.java
index 830f287..7fa8a60 100644
--- a/core/src/main/java/org/apache/oozie/util/TimestampedMessageParser.java
+++ b/core/src/main/java/org/apache/oozie/util/TimestampedMessageParser.java
@@ -194,50 +194,24 @@ public class TimestampedMessageParser {
}
/**
- * Streams log messages to the passed in Writer. Flushes the log writing
- * based on buffer len
+ * Streams log messages to the passed in Writer, with zero bytes already
+ * written
*
* @param writer
- * @param bufferLen maximum len of log buffer
- * @param bytesWritten num bytes already written to writer
+ * @param logStreamer the log streamer
* @throws IOException
*/
- public void processRemaining(Writer writer, int bufferLen, int bytesWritten) throws IOException {
+ public void processRemaining(Writer writer, XLogStreamer logStreamer) throws IOException {
while (increment()) {
writer.write(lastMessage);
- bytesWritten += lastMessage.length();
- if (bytesWritten > bufferLen) {
+ if (logStreamer.shouldFlushOutput(lastMessage.length())) {
writer.flush();
- bytesWritten = 0;
}
}
writer.flush();
}
/**
- * Streams log messages to the passed in Writer, with zero bytes already
- * written
- *
- * @param writer
- * @param bufferLen maximum len of log buffer
- * @throws IOException
- */
- public void processRemaining(Writer writer, int bufferLen) throws IOException {
- processRemaining(writer, bufferLen, 0);
- }
-
- /**
- * Streams log messages to the passed in Writer, with default buffer len 4K
- * and zero bytes already written
- *
- * @param writer
- * @throws IOException
- */
- public void processRemaining(Writer writer) throws IOException {
- processRemaining(writer, Services.get().get(XLogStreamingService.class).getBufferLen());
- }
-
- /**
* Splits the log message into parts
*
* @param line
http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/util/XLogAuditFilter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/XLogAuditFilter.java b/core/src/main/java/org/apache/oozie/util/XLogAuditFilter.java
index c377db5..465d808 100644
--- a/core/src/main/java/org/apache/oozie/util/XLogAuditFilter.java
+++ b/core/src/main/java/org/apache/oozie/util/XLogAuditFilter.java
@@ -17,8 +17,6 @@
*/
package org.apache.oozie.util;
-import java.io.IOException;
-import java.util.Date;
import java.util.regex.Pattern;
import org.apache.oozie.service.DagXLogInfoService;
@@ -29,20 +27,6 @@ public class XLogAuditFilter extends XLogFilter {
super(xLogUserFilterParam);
}
- @Override
- public void calculateAndValidateDateRange(Date jobStartTime, Date jobEndTime) throws IOException {
- // for testcase, otherwise jobStartTime and jobEndTime will be always set
- if (jobStartTime == null || jobEndTime == null) {
- return;
- }
-
- if (jobStartTime.after(jobEndTime)) {
- throw new IOException("Start time should be less than end time. startTime = " + jobStartTime
- + " endtime = " + jobEndTime);
- }
-
- }
-
public void constructPattern() {
// audit log will only need to scan on jobID
StringBuilder sb = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/util/XLogAuditStreamer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/XLogAuditStreamer.java b/core/src/main/java/org/apache/oozie/util/XLogAuditStreamer.java
new file mode 100644
index 0000000..6b27ae3
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/util/XLogAuditStreamer.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.oozie.client.rest.RestConstants;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.service.ConfigurationService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.XLogService;
+
+public class XLogAuditStreamer extends XLogStreamer {
+ public static final String STREAM_BUFFER_LEN = CONF_PREFIX + "audit.buffer.len";
+
+ public XLogAuditStreamer(XLogFilter logFilter, Map<String, String[]> requestParameters) {
+ super(logFilter, Services.get().get(XLogService.class).getOozieAuditLogPath(),
+ Services.get().get(XLogService.class).getOozieAuditLogName(),
+ Services.get().get(XLogService.class).getOozieAuditLogRotation());
+ this.requestParam = requestParameters;
+ bufferLen = ConfigurationService.getInt(STREAM_BUFFER_LEN, 3);
+ }
+
+ public XLogAuditStreamer(Map<String, String[]> requestParameters) throws CommandException {
+ this(new XLogAuditFilter(new XLogUserFilterParam(requestParameters)), requestParameters);
+
+ }
+
+ @Override
+ protected void calculateAndValidateDateRange(Date startTime, Date endTime) throws IOException {
+ logFilter.calculateAndCheckDates(startTime, endTime);
+ // no validate
+ }
+
+ @Override
+ public boolean isLogEnabled() {
+ return Services.get().get(XLogService.class).isAuditLogEnabled();
+ }
+
+ @Override
+ public String getLogType() {
+ return RestConstants.JOB_SHOW_AUDIT_LOG;
+ }
+
+ @Override
+ public String getLogDisableMessage() {
+ return "Audit Log is disabled!!";
+ }
+
+ public boolean shouldFlushOutput(int byteCountIgnored) {
+ // audit flush is done on number of lines written, not byte
+ this.totalDataWritten += 1;
+ if (this.totalDataWritten > getBufferLen()) {
+ this.totalDataWritten = 0;
+ return true;
+ }
+ else {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/util/XLogErrorStreamer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/XLogErrorStreamer.java b/core/src/main/java/org/apache/oozie/util/XLogErrorStreamer.java
new file mode 100644
index 0000000..3395925
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/util/XLogErrorStreamer.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.oozie.client.rest.RestConstants;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.service.ConfigurationService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.XLogService;
+
+public class XLogErrorStreamer extends XLogStreamer {
+
+ public static final String STREAM_BUFFER_LEN = CONF_PREFIX + "error.buffer.len";
+
+ public XLogErrorStreamer(XLogFilter logFilter, Map<String, String[]> requestParameters) {
+ super(logFilter, Services.get().get(XLogService.class).getOozieErrorLogPath(), Services.get()
+ .get(XLogService.class).getOozieErrorLogName(), Services.get().get(XLogService.class)
+ .getOozieErrorLogRotation());
+ this.requestParam = requestParameters;
+ bufferLen = ConfigurationService.getInt(STREAM_BUFFER_LEN, 2048);
+ }
+
+ public XLogErrorStreamer(Map<String, String[]> requestParameters) throws CommandException {
+ this(new XLogFilter(new XLogUserFilterParam(requestParameters)), requestParameters);
+ }
+
+ @Override
+ protected void calculateAndValidateDateRange(Date startTime, Date endTime) throws IOException {
+ logFilter.calculateAndCheckDates(startTime, endTime);
+ // no validate and truncating
+ }
+
+ @Override
+ public boolean isLogEnabled() {
+ return Services.get().get(XLogService.class).isErrorLogEnabled();
+ }
+
+ @Override
+ public String getLogType() {
+ return RestConstants.JOB_SHOW_ERROR_LOG;
+ }
+
+ @Override
+ public String getLogDisableMessage() {
+ return "Error Log is disabled!!";
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/util/XLogFilter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/XLogFilter.java b/core/src/main/java/org/apache/oozie/util/XLogFilter.java
index cdf172b..505f945 100644
--- a/core/src/main/java/org/apache/oozie/util/XLogFilter.java
+++ b/core/src/main/java/org/apache/oozie/util/XLogFilter.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.oozie.util;
import java.io.IOException;
@@ -56,6 +55,7 @@ public class XLogFilter {
private boolean isActionList = false;
private String formattedEndDate;
private String formattedStartDate;
+ private String truncatedMessage;
// TODO Patterns to be read from config file
private static final String DEFAULT_REGEX = "[^\\]]*";
@@ -290,8 +290,9 @@ public class XLogFilter {
}
public String getDebugMessage() {
- return "Log start time = " + getStartDate() + ". Log end time = " + getEndDate() + ". User Log Filter = "
- + getUserLogFilter() + System.getProperty("line.separator");
+ return new StringBuilder("Log start time = ").append(getStartDate()).append(". Log end time = ")
+ .append(getEndDate()).append(". User Log Filter = ").append(getUserLogFilter())
+ .append(System.getProperty("line.separator")).toString();
}
public boolean isActionList() {
@@ -302,7 +303,20 @@ public class XLogFilter {
this.isActionList = isActionList;
}
- private void calculateScanDate(Date jobStartTime, Date jobEndTime) throws IOException {
+ /**
+ * Calculate scan date
+ *
+ * @param jobStartTime the job start time
+ * @param jobEndTime the job end time
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ public void calculateAndCheckDates(Date jobStartTime, Date jobEndTime) throws IOException {
+
+ // for testcase, otherwise jobStartTime and jobEndTime will be always
+ // set
+ if (jobStartTime == null || jobEndTime == null) {
+ return;
+ }
if (userLogFilter.getStartDate() != null) {
startDate = userLogFilter.getStartDate();
@@ -311,14 +325,15 @@ public class XLogFilter {
startDate = adjustOffset(jobStartTime, userLogFilter.getStartOffset());
}
else {
- startDate = jobStartTime;
+ startDate = new Date(jobStartTime.getTime());
}
if (userLogFilter.getEndDate() != null) {
endDate = userLogFilter.getEndDate();
}
else if (userLogFilter.getEndOffset() != -1) {
- // If user has specified startdate as absolute then end offset will be on user start date,
+ // If user has specified startdate as absolute then end offset will
+ // be on user start date,
// else end offset will be calculated on job startdate.
if (userLogFilter.getStartDate() != null) {
endDate = adjustOffset(startDate, userLogFilter.getEndOffset());
@@ -328,14 +343,14 @@ public class XLogFilter {
}
}
else {
- endDate = jobEndTime;
+ endDate = new Date(jobEndTime.getTime());
}
// if recent offset is specified then start time = endtime - offset
if (getUserLogFilter().getRecent() != -1) {
startDate = adjustOffset(endDate, userLogFilter.getRecent() * -1);
}
- //add buffer iff dates are not asbsolute
+ // add buffer if dates are not absolute
if (userLogFilter.getStartDate() == null) {
startDate = adjustOffset(startDate, -LOG_TIME_BUFFER);
}
@@ -345,26 +360,27 @@ public class XLogFilter {
formattedEndDate = XLogUserFilterParam.dt.get().format(getEndDate());
formattedStartDate = XLogUserFilterParam.dt.get().format(getStartDate());
+
+ if (startDate.after(endDate)) {
+ throw new IOException(
+ "Start time should be less than end time. startTime = " + startDate + " endTime = " + endDate);
+ }
}
/**
- * Calculate and validate date range.
+ * validate date range.
*
* @param jobStartTime the job start time
* @param jobEndTime the job end time
* @throws IOException Signals that an I/O exception has occurred.
*/
- public void calculateAndValidateDateRange(Date jobStartTime, Date jobEndTime) throws IOException {
- // for testcase, otherwise jobStartTime and jobEndTime will be always set
+ public void validateDateRange(Date jobStartTime, Date jobEndTime) throws IOException {
+ // for testcase, otherwise jobStartTime and jobEndTime will be always
+ // set
if (jobStartTime == null || jobEndTime == null) {
return;
}
- calculateScanDate(jobStartTime, jobEndTime);
- if (startDate.after(endDate)) {
- throw new IOException("Start time should be less than end time. startTime = " + startDate + " endtime = "
- + endDate);
- }
long diffHours = (endDate.getTime() - startDate.getTime()) / (60 * 60 * 1000);
if (isActionList) {
int actionLogDuration = ConfigurationService.getInt(MAX_ACTIONLIST_SCAN_DURATION);
@@ -372,10 +388,9 @@ public class XLogFilter {
return;
}
if (diffHours > actionLogDuration) {
- throw new IOException(
- "Request log streaming time range with action list is higher than configured. Please reduce the scan "
- + "time range. Input range (hours) = " + diffHours
- + " system allowed (hours) with action list = " + actionLogDuration);
+ setTruncatedMessage("Truncated logs to max log scan duration " + actionLogDuration + " hrs");
+ startDate = adjustOffset(endDate, -1 * actionLogDuration * 60);
+ startDate = adjustOffset(startDate, -1 * LOG_TIME_BUFFER);
}
}
else {
@@ -384,14 +399,27 @@ public class XLogFilter {
return;
}
if (diffHours > logDuration) {
- throw new IOException(
- "Request log streaming time range is higher than configured. Please reduce the scan time range. For coord"
- + " jobs you can provide action list to reduce log scan time range. Input range (hours) = "
- + diffHours + " system allowed (hours) = " + logDuration);
+ setTruncatedMessage("Truncated logs to max log scan duration " + logDuration + " hrs");
+ startDate = adjustOffset(endDate, -1 * logDuration * 60);
+ startDate = adjustOffset(startDate, -1 * LOG_TIME_BUFFER);
}
}
}
+ protected void setTruncatedMessage(String message) {
+ truncatedMessage = message;
+
+ }
+
+ public String getTruncatedMessage() {
+ if (StringUtils.isEmpty(truncatedMessage)) {
+ return truncatedMessage;
+ }
+ else {
+ return truncatedMessage + System.getProperty("line.separator");
+ }
+ }
+
/**
* Adjust offset, offset will always be in min.
*
http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/java/org/apache/oozie/util/XLogStreamer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/XLogStreamer.java b/core/src/main/java/org/apache/oozie/util/XLogStreamer.java
index 19f1fee..012fbef 100644
--- a/core/src/main/java/org/apache/oozie/util/XLogStreamer.java
+++ b/core/src/main/java/org/apache/oozie/util/XLogStreamer.java
@@ -25,28 +25,64 @@ import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
+import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.io.BufferedReader;
+import org.apache.commons.lang.StringUtils;
+import org.apache.oozie.client.rest.RestConstants;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.service.ConfigurationService;
+import org.apache.oozie.service.Service;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.XLogService;
+
/**
* XLogStreamer streams the given log file to writer after applying the given filter.
*/
public class XLogStreamer {
private static XLog LOG = XLog.getLog(XLogStreamer.class);
+ protected static final String CONF_PREFIX = Service.CONF_PREFIX + "XLogStreamingService.";
+ public static final String STREAM_BUFFER_LEN = CONF_PREFIX + "buffer.len";
+
private String logFile;
private String logPath;
- private XLogFilter logFilter;
+ protected XLogFilter logFilter;
private long logRotation;
+ Map<String, String[]> requestParam;
+ protected int totalDataWritten;
+ protected int bufferLen;
public XLogStreamer(XLogFilter logFilter, String logPath, String logFile, long logRotationSecs) {
- this.logFilter = logFilter;
if (logFile == null) {
logFile = "oozie-app.log";
}
+
+ this.logFilter = logFilter;
this.logFile = logFile;
this.logPath = logPath;
this.logRotation = logRotationSecs * 1000l;
+ bufferLen = ConfigurationService.getInt(STREAM_BUFFER_LEN, 4096);
+ }
+
+ public XLogStreamer(XLogFilter logFilter) {
+ this(logFilter, Services.get().get(XLogService.class).getOozieLogPath(), Services.get().get(XLogService.class)
+ .getOozieLogName(), Services.get().get(XLogService.class).getOozieLogRotation());
+
+ }
+
+ public XLogStreamer(XLogFilter logFilter, Map<String, String[]> params) {
+ this(logFilter, Services.get().get(XLogService.class).getOozieLogPath(), Services.get().get(XLogService.class)
+ .getOozieLogName(), Services.get().get(XLogService.class).getOozieLogRotation());
+ this.requestParam = params;
+
+ }
+
+
+ public XLogStreamer(Map<String, String[]> params) throws CommandException {
+ this(new XLogFilter(new XLogUserFilterParam(params)));
+ this.requestParam = params;
}
/**
@@ -58,15 +94,34 @@ public class XLogStreamer {
* @param endTime
* @throws IOException
*/
- public void streamLog(Writer writer, Date startTime, Date endTime, int bufferLen) throws IOException {
+ public void streamLog(Writer writer, Date startTime, Date endTime) throws IOException {
+ streamLog(writer, startTime, endTime, true);
+ }
+
+ /**
+ * Gets the files that are modified between startTime and endTime in the given logPath and streams the log after
+ * applying the filters
+ *
+ * @param writer the writer
+ * @param startTime the start time
+ * @param endTime the end time
+ * @param appendDebug the append debug
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ public void streamLog(Writer writer, Date startTime, Date endTime, boolean appendDebug) throws IOException {
// Get a Reader for the log file(s)
BufferedReader reader = new BufferedReader(getReader(startTime, endTime));
try {
- if(logFilter.isDebugMode()){
- writer.write(logFilter.getDebugMessage());
+ if (appendDebug) {
+ if (!StringUtils.isEmpty(logFilter.getTruncatedMessage())) {
+ writer.write(logFilter.getTruncatedMessage());
+ }
+ if (logFilter.isDebugMode()) {
+ writer.write(logFilter.getDebugMessage());
+ }
}
// Process the entire logs from the reader using the logFilter
- new TimestampedMessageParser(reader, logFilter).processRemaining(writer, bufferLen);
+ new TimestampedMessageParser(reader, logFilter).processRemaining(writer, this);
}
finally {
reader.close();
@@ -83,12 +138,17 @@ public class XLogStreamer {
*/
private MultiFileReader getReader(Date startTime, Date endTime) throws IOException {
- logFilter.calculateAndValidateDateRange(startTime, endTime);
+ calculateAndValidateDateRange(startTime, endTime);
return new MultiFileReader(getFileList(logFilter.getStartDate(), logFilter.getEndDate()));
}
+ protected void calculateAndValidateDateRange(Date startTime, Date endTime) throws IOException {
+ logFilter.calculateAndCheckDates(startTime, endTime);
+ logFilter.validateDateRange(startTime, endTime);
+ }
+
public BufferedReader makeReader(Date startTime, Date endTime) throws IOException {
- return new BufferedReader(getReader(startTime,endTime));
+ return new BufferedReader(getReader(startTime, endTime));
}
/**
@@ -220,7 +280,8 @@ public class XLogStreamer {
LOG.warn("oozie.log has been GZipped, which is unexpected");
// Return a value other than -1 to include the file in list
returnVal = 0;
- } else {
+ }
+ else {
Matcher m = gzTimePattern.matcher(fileName);
if (m.matches() && m.groupCount() == 4) {
int year = Integer.parseInt(m.group(1));
@@ -242,11 +303,47 @@ public class XLogStreamer {
|| (startTime <= logFileStartTime && endTime >= logFileEndTime)) {
returnVal = logFileStartTime;
}
- } else {
+ }
+ else {
LOG.debug("Filename " + fileName + " does not match the expected format");
returnVal = -1;
}
}
return returnVal;
}
+
+ public boolean isLogEnabled() {
+ return Services.get().get(XLogService.class).getLogOverWS();
+ }
+
+ public String getLogType() {
+ return RestConstants.JOB_SHOW_LOG;
+ }
+
+ public XLogFilter getXLogFilter() {
+ return logFilter;
+ }
+
+ public String getLogDisableMessage() {
+ return "Log streaming disabled!!";
+ }
+
+ public Map<String, String[]> getRequestParam() {
+ return requestParam;
+ }
+
+ public boolean shouldFlushOutput(int writtenBytes) {
+ this.totalDataWritten += writtenBytes;
+ if (this.totalDataWritten > getBufferLen()) {
+ this.totalDataWritten = 0;
+ return true;
+ }
+ else {
+ return false;
+ }
+ }
+
+ public int getBufferLen() {
+ return bufferLen;
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index ae384b4..e7a48a0 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -192,7 +192,23 @@
<property>
<name>oozie.service.XLogStreamingService.buffer.len</name>
<value>4096</value>
- <description>4K buffer for streaming the logs progressively</description>
+ <description>4K buffer for streaming the logs progressively
+ </description>
+ </property>
+ <property>
+ <name>oozie.service.XLogStreamingService.error.buffer.len</name>
+ <value>2048</value>
+ <description>2K buffer for streaming the error logs
+ progressively
+ </description>
+ </property>
+
+ <property>
+ <name>oozie.service.XLogStreamingService.audit.buffer.len</name>
+ <value>3</value>
+ <description>Number of lines for streaming the audit logs
+ progressively
+ </description>
</property>
<!-- HCatAccessorService -->
@@ -2223,6 +2239,14 @@ will be the requeue interval for the actions which are waiting for a long time w
</property>
<property>
+ <name>oozie.server.connection.timeout.seconds</name>
+ <value>180</value>
+ <description>
+ Defines connection timeout used for Oozie server communicating to other Oozie server over HTTP(s). Default is 3 min.
+ </description>
+ </property>
+
+ <property>
<name>oozie.authentication.token.validity</name>
<value>36000</value>
<description>
http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java b/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java
index 3eb1016..d6f1aef 100644
--- a/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java
+++ b/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java
@@ -26,13 +26,9 @@ import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
-import org.apache.oozie.client.Job;
import org.apache.oozie.client.rest.RestConstants;
-import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
import org.apache.oozie.service.DagXLogInfoService;
import org.apache.oozie.service.Services;
@@ -40,6 +36,7 @@ import org.apache.oozie.service.XLogStreamingService;
import org.apache.oozie.test.XDataTestCase;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.XLogFilter;
+import org.apache.oozie.util.XLogStreamer;
public class TestCoordinatorEngineStreamLog extends XDataTestCase {
private Services services;
@@ -63,9 +60,9 @@ public class TestCoordinatorEngineStreamLog extends XDataTestCase {
Date endTime;
@Override
- public void streamLog(XLogFilter filter1, Date startTime, Date endTime, Writer writer, Map<String, String[]> params)
+ public void streamLog(XLogStreamer logStreamer, Date startTime, Date endTime, Writer writer)
throws IOException {
- filter = filter1;
+ filter = logStreamer.getXLogFilter();
this.startTime = startTime;
this.endTime = endTime;
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java b/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java
index 2a3d3d3..3c6525d 100644
--- a/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java
@@ -36,6 +36,7 @@ import org.apache.oozie.test.XTestCase;
import org.apache.oozie.util.ConfigUtils;
import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.XLogFilter;
+import org.apache.oozie.util.XLogStreamer;
import org.apache.oozie.workflow.lite.LiteWorkflowAppParser;
import java.io.File;
@@ -176,7 +177,7 @@ public class TestConfigurationService extends XTestCase {
assertEquals("http://0.0.0.0:11000/oozie/callback", ConfigurationService.get(CallbackService.CONF_BASE_URL));
assertEquals(5, ConfigurationService.getInt(CallbackService.CONF_EARLY_REQUEUE_MAX_RETRIES));
assertEquals("gz", ConfigurationService.get(CodecFactory.COMPRESSION_OUTPUT_CODEC));
- assertEquals(4096, ConfigurationService.getInt(XLogStreamingService.STREAM_BUFFER_LEN));
+ assertEquals(4096, ConfigurationService.getInt(XLogStreamer.STREAM_BUFFER_LEN));
assertEquals(10000, ConfigurationService.getLong(JvmPauseMonitorService.WARN_THRESHOLD_KEY));
assertEquals(60, ConfigurationService.getInt(InstrumentationService.CONF_LOGGING_INTERVAL));
assertEquals(30, ConfigurationService.getInt(PurgeService.CONF_OLDER_THAN));
http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/test/java/org/apache/oozie/service/TestXLogStreamingService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestXLogStreamingService.java b/core/src/test/java/org/apache/oozie/service/TestXLogStreamingService.java
index bebb678..1921f1b 100644
--- a/core/src/test/java/org/apache/oozie/service/TestXLogStreamingService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestXLogStreamingService.java
@@ -23,18 +23,23 @@ import org.apache.oozie.command.CommandException;
import org.apache.oozie.test.XTestCase;
import org.apache.oozie.util.XLog;
import org.apache.oozie.util.XLogAuditFilter;
+import org.apache.oozie.util.XLogAuditStreamer;
+import org.apache.oozie.util.XLogErrorStreamer;
import org.apache.oozie.util.XLogFilter;
+import org.apache.oozie.util.XLogStreamer;
import org.apache.oozie.util.XLogUserFilterParam;
import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.StringWriter;
-import java.util.HashMap;
+import java.util.Date;
import java.util.Properties;
public class TestXLogStreamingService extends XTestCase {
+ final int FIFTEEN_HOURS = 60 * 60 * 1000 * 15;
+
@Override
protected void setUp() throws Exception {
super.setUp();
@@ -386,6 +391,28 @@ public class TestXLogStreamingService extends XTestCase {
}
}
+ public void testTuncateLog() throws Exception {
+ File log4jFile = new File(getTestCaseConfDir(), "test-log4j.properties");
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ InputStream is = cl.getResourceAsStream("test-no-dash-log4j.properties");
+ Properties log4jProps = new Properties();
+ log4jProps.load(is);
+ log4jProps.setProperty("log4j.appender.oozie.File", getTestCaseDir() + "/oozie.log");
+ log4jProps.store(new FileOutputStream(log4jFile), "");
+ setSystemProperty(XLogService.LOG4J_FILE, log4jFile.getName());
+
+ new Services().init();
+ ConfigurationService.set(XLogFilter.MAX_SCAN_DURATION, "10");
+ Date startDate = new Date();
+ Date endDate = new Date(startDate.getTime() + FIFTEEN_HOURS);
+ String log = doStreamLog(new XLogFilter(), startDate, endDate);
+ assertTrue(log.contains("Truncated logs to max log scan duration"));
+ log = doStreamErrorLog(new XLogFilter(), startDate, endDate);
+ assertFalse(log.contains("Truncated logs to max log scan duration"));
+ log = doStreamAuditLog(new XLogFilter(), startDate, endDate);
+ assertFalse(log.contains("Truncated logs to max log scan duration"));
+ }
+
private boolean doStreamDisabledCheckWithServices() throws Exception {
boolean result = false;
try {
@@ -405,21 +432,35 @@ public class TestXLogStreamingService extends XTestCase {
}
private String doStreamLog(XLogFilter xf) throws Exception {
- StringWriter w = new StringWriter();
- Services.get().get(XLogStreamingService.class).streamLog(xf, null, null, w, new HashMap<String, String[]>());
- return w.toString();
+ return doStreamLog(new XLogStreamer(xf), null, null);
}
+
private String doStreamErrorLog(XLogFilter xf) throws Exception {
- StringWriter w = new StringWriter();
- Services.get().get(XLogStreamingService.class).streamErrorLog(xf, null, null, w, new HashMap<String, String[]>());
- return w.toString();
+ return doStreamLog(new XLogErrorStreamer(xf, null), null, null);
}
+
private String doStreamAuditLog(XLogFilter xf) throws Exception {
+ return doStreamLog(new XLogAuditStreamer(xf, null), null, null);
+ }
+
+ private String doStreamLog(XLogStreamer logStreamer, Date startDate, Date endDate) throws Exception {
StringWriter w = new StringWriter();
- Services.get().get(XLogStreamingService.class).streamAuditLog(xf, null, null, w, new HashMap<String, String[]>());
+ Services.get().get(XLogStreamingService.class)
+ .streamLog(logStreamer, startDate, endDate, w);
return w.toString();
}
+ private String doStreamLog(XLogFilter xf, Date startDate, Date endDate) throws Exception {
+ return doStreamLog(new XLogStreamer(xf, null), startDate, endDate);
+ }
+
+ private String doStreamErrorLog(XLogFilter xf, Date startDate, Date endDate) throws Exception {
+ return doStreamLog(new XLogErrorStreamer(xf, null), null, null);
+ }
+
+ private String doStreamAuditLog(XLogFilter xf, Date startDate, Date endDate) throws Exception {
+ return doStreamLog(new XLogAuditStreamer(xf, null), null, null);
+ }
private boolean doErrorStreamDisabledCheck() throws Exception {
XLogFilter xf = new XLogFilter(new XLogUserFilterParam(null));
http://git-wip-us.apache.org/repos/asf/oozie/blob/993f06df/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java b/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java
index fca8d84..df097fd 100644
--- a/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java
@@ -32,7 +32,9 @@ import org.apache.oozie.client.rest.RestConstants;
import org.apache.oozie.test.EmbeddedServletContainer;
import org.apache.oozie.test.ZKXTestCase;
import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.XLogErrorStreamer;
import org.apache.oozie.util.XLogFilter;
+import org.apache.oozie.util.XLogStreamer;
import org.apache.oozie.util.ZKUtils;
public class TestZKXLogStreamingService extends ZKXTestCase {
@@ -198,15 +200,29 @@ public class TestZKXLogStreamingService extends ZKXTestCase {
return doStreamLog(xf, new HashMap<String, String[]>());
}
+ protected String doStreamLog(XLogFilter xf, Date startTime, Date endTime) throws Exception {
+ return doStreamLog(xf, new HashMap<String, String[]>(), false, startTime, endTime);
+ }
+
protected String doStreamErrorLog(XLogFilter xf) throws Exception {
return doStreamLog(xf, new HashMap<String, String[]>(), true);
}
+ private String doStreamErrorLog(XLogFilter xf, Date startDate, Date endDate) throws Exception {
+ return doStreamLog(xf, new HashMap<String, String[]>(), true, startDate, startDate);
+ }
+
+
protected String doStreamLog(XLogFilter xf, Map<String, String[]> param) throws Exception {
return doStreamLog(xf, param, false);
}
protected String doStreamLog(XLogFilter xf, Map<String, String[]> param, boolean isErrorLog) throws Exception {
+ return doStreamLog(xf, param, isErrorLog, null, null);
+ }
+
+ protected String doStreamLog(XLogFilter xf, Map<String, String[]> param, boolean isErrorLog, Date startTime,
+ Date endTime) throws Exception {
StringWriter w = new StringWriter();
ZKXLogStreamingService zkxlss = new ZKXLogStreamingService();
try {
@@ -215,10 +231,10 @@ public class TestZKXLogStreamingService extends ZKXTestCase {
zkxlss.init(services);
sleep(1000); // Sleep to allow ZKUtils ServiceCache to update
if (isErrorLog) {
- zkxlss.streamErrorLog(xf, null, null, w, param);
+ zkxlss.streamLog(new XLogErrorStreamer(xf, param), startTime, endTime, w);
}
else {
- zkxlss.streamLog(xf, null, null, w, param);
+ zkxlss.streamLog(new XLogStreamer(xf, param), startTime, endTime, w);
}
}
finally {
@@ -583,4 +599,30 @@ public class TestZKXLogStreamingService extends ZKXTestCase {
}
}
+ public void testTuncateLog() throws Exception {
+ XLogFilter.reset();
+ File log4jFile = new File(getTestCaseConfDir(), "test-log4j.properties");
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ InputStream is = cl.getResourceAsStream("test-no-dash-log4j.properties");
+ Properties log4jProps = new Properties();
+ log4jProps.load(is);
+ // prevent conflicts with other tests by changing the log file location
+ log4jProps.setProperty("log4j.appender.oozie.File", getTestCaseDir() + "/oozie.log");
+ log4jProps.store(new FileOutputStream(log4jFile), "");
+ setSystemProperty(XLogService.LOG4J_FILE, log4jFile.getName());
+ assertFalse(doStreamDisabledCheck());
+ File logFile = new File(Services.get().get(XLogService.class).getOozieLogPath(),
+ Services.get().get(XLogService.class).getOozieLogName());
+ logFile.getParentFile().mkdirs();
+
+ ConfigurationService.set(XLogFilter.MAX_SCAN_DURATION, "1");
+ Date startDate = new Date();
+ Date endDate = new Date(startDate.getTime() + 60 * 60 * 1000 * 15);
+
+ String log = doStreamLog(new XLogFilter(), startDate, endDate);
+ assertTrue(log.contains("Truncated logs to max log scan duration"));
+ String logError = doStreamErrorLog(new XLogFilter(), startDate, endDate);
+ assertFalse(logError.contains("Truncated logs to max log scan duration"));
+ }
+
}