You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2017/03/03 16:40:11 UTC
[1/2] apex-core git commit: APEXCORE-563: Add log filename and offset
to the container/operator error events
Repository: apex-core
Updated Branches:
refs/heads/master 1e4785671 -> 8ce340ef3
APEXCORE-563: Add log filename and offset to the container/operator error events
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/764ca7b2
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/764ca7b2
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/764ca7b2
Branch: refs/heads/master
Commit: 764ca7b285d5af4305b5c792cc1d9154da2f0c0d
Parents: a6dd73b
Author: Priyanka Gugale <pr...@apache.org>
Authored: Tue Dec 13 17:22:47 2016 +0530
Committer: priya <pr...@apache.org>
Committed: Fri Mar 3 10:33:57 2017 +0530
----------------------------------------------------------------------
.../datatorrent/stram/StramLocalCluster.java | 5 +-
.../stram/StreamingContainerParent.java | 9 +-
.../com/datatorrent/stram/api/StramEvent.java | 58 ++++++++--
.../StreamingContainerUmbilicalProtocol.java | 5 +-
.../stram/engine/StreamingContainer.java | 14 ++-
.../com/datatorrent/stram/util/LoggerUtil.java | 77 +++++++++++++
.../org/apache/apex/log/LogFileInformation.java | 45 ++++++++
.../datatorrent/stram/StramRecoveryTest.java | 6 +-
.../stram/util/LogFileInformationTest.java | 111 +++++++++++++++++++
9 files changed, 308 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/764ca7b2/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
index 4d452af..7ebeea6 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
@@ -33,6 +33,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import org.apache.apex.log.LogFileInformation;
+
import org.apache.commons.lang3.SerializationUtils;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
@@ -105,7 +108,7 @@ public class StramLocalCluster implements Runnable, Controller
}
@Override
- public void reportError(String containerId, int[] operators, String msg) throws IOException
+ public void reportError(String containerId, int[] operators, String msg, LogFileInformation logFileInfo) throws IOException
{
log(containerId, msg);
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/764ca7b2/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java
index e5d8a97..76f89bd 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java
@@ -24,6 +24,8 @@ import java.net.InetSocketAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.apex.log.LogFileInformation;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.ProtocolSignature;
@@ -172,15 +174,16 @@ public class StreamingContainerParent extends org.apache.hadoop.service.Composit
}
@Override
- public void reportError(String containerId, int[] operators, String msg) throws IOException
+ public void reportError(String containerId, int[] operators, String msg, LogFileInformation logFileInfo) throws IOException
{
if (operators == null || operators.length == 0) {
- dagManager.recordEventAsync(new ContainerErrorEvent(containerId, msg));
+ dagManager.recordEventAsync(new ContainerErrorEvent(containerId, msg, logFileInfo));
} else {
for (int operator : operators) {
OperatorInfo operatorInfo = dagManager.getOperatorInfo(operator);
if (operatorInfo != null) {
- dagManager.recordEventAsync(new OperatorErrorEvent(operatorInfo.name, operator, containerId, msg));
+ dagManager.recordEventAsync(new OperatorErrorEvent(operatorInfo.name, operator, containerId, msg,
+ logFileInfo));
}
}
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/764ca7b2/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java b/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java
index e9c5f13..d80fa94 100644
--- a/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java
+++ b/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java
@@ -20,6 +20,8 @@ package com.datatorrent.stram.api;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.apex.log.LogFileInformation;
+
import com.datatorrent.stram.plan.logical.requests.LogicalPlanRequest;
/**
@@ -35,13 +37,20 @@ public abstract class StramEvent
private long timestamp = System.currentTimeMillis();
private String reason;
private LogLevel logLevel;
+ private LogFileInformation logFileInformation;
public abstract String getType();
protected StramEvent(LogLevel logLevel)
{
+ this(logLevel, null);
+ }
+
+ protected StramEvent(LogLevel logLevel, LogFileInformation logFileInformation)
+ {
id = nextId.getAndIncrement();
this.logLevel = logLevel;
+ this.logFileInformation = logFileInformation;
}
public long getId()
@@ -74,6 +83,21 @@ public abstract class StramEvent
return logLevel;
}
+ public String getLogFileName()
+ {
+ return logFileInformation != null ? logFileInformation.fileName : null;
+ }
+
+ public long getLogFileOffset()
+ {
+ return logFileInformation != null ? logFileInformation.fileOffset : 0;
+ }
+
+ public void setLogFileInfomation(LogFileInformation logFileInformation)
+ {
+ this.logFileInformation = logFileInformation;
+ }
+
public static enum LogLevel
{
TRACE,
@@ -90,7 +114,12 @@ public abstract class StramEvent
public OperatorEvent(String operatorName, LogLevel logLevel)
{
- super(logLevel);
+ this(operatorName, logLevel, null);
+ }
+
+ public OperatorEvent(String operatorName, LogLevel logLevel, LogFileInformation logFileInformation)
+ {
+ super(logLevel, logFileInformation);
this.operatorName = operatorName;
}
@@ -202,7 +231,13 @@ public abstract class StramEvent
public PhysicalOperatorEvent(String operatorName, int operatorId, LogLevel logLevel)
{
- super(operatorName, logLevel);
+ this(operatorName, operatorId, logLevel, null);
+ }
+
+ public PhysicalOperatorEvent(String operatorName, int operatorId, LogLevel logLevel,
+ LogFileInformation logFileInformation)
+ {
+ super(operatorName, logLevel, logFileInformation);
this.operatorId = operatorId;
}
@@ -492,14 +527,16 @@ public abstract class StramEvent
private String containerId;
private String errorMessage;
- public OperatorErrorEvent(String operatorName, int operatorId, String containerId, String errorMessage)
+ public OperatorErrorEvent(String operatorName, int operatorId, String containerId, String errorMessage,
+ LogFileInformation logFileInformation)
{
- this(operatorName, operatorId, containerId, errorMessage, LogLevel.ERROR);
+ this(operatorName, operatorId, containerId, errorMessage, logFileInformation, LogLevel.ERROR);
}
- public OperatorErrorEvent(String operatorName, int operatorId, String containerId, String errorMessage, LogLevel logLevel)
+ public OperatorErrorEvent(String operatorName, int operatorId, String containerId, String errorMessage,
+ LogFileInformation logFileInformation, LogLevel logLevel)
{
- super(operatorName, operatorId, logLevel);
+ super(operatorName, operatorId, logLevel, logFileInformation);
this.containerId = containerId;
this.errorMessage = errorMessage;
}
@@ -537,14 +574,15 @@ public abstract class StramEvent
private String containerId;
private String errorMessage;
- public ContainerErrorEvent(String containerId, String errorMessage)
+ public ContainerErrorEvent(String containerId, String errorMessage, LogFileInformation logFileInformation)
{
- this(containerId, errorMessage, LogLevel.ERROR);
+ this(containerId, errorMessage, logFileInformation, LogLevel.ERROR);
}
- public ContainerErrorEvent(String containerId, String errorMessage, LogLevel logLevel)
+ public ContainerErrorEvent(String containerId, String errorMessage, LogFileInformation logFileInformation,
+ LogLevel logLevel)
{
- super(logLevel);
+ super(logLevel, logFileInformation);
this.containerId = containerId;
this.errorMessage = errorMessage;
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/764ca7b2/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java b/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java
index 13832ba..b78e8f2 100644
--- a/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java
+++ b/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java
@@ -23,6 +23,8 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+import org.apache.apex.log.LogFileInformation;
+
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -430,8 +432,9 @@ public interface StreamingContainerUmbilicalProtocol extends VersionedProtocol
* @param containerId
* @param operators
* @param msg
+ * @param logFileInfo
*/
- void reportError(String containerId, int[] operators, String msg) throws IOException;
+ void reportError(String containerId, int[] operators, String msg, LogFileInformation logFileInfo) throws IOException;
/**
* To be called periodically by child for heartbeat protocol.
http://git-wip-us.apache.org/repos/asf/apex-core/blob/764ca7b2/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
index 38963c4..437070c 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
@@ -42,6 +42,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.apex.log.LogFileInformation;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
@@ -315,10 +316,11 @@ public class StreamingContainer extends YarnContainerMain
stramChild.teardown();
}
} catch (Error | Exception e) {
+ LogFileInformation logFileInfo = LoggerUtil.getLogFileInformation();
logger.error("Fatal {} in container!", (e instanceof Error) ? "Error" : "Exception", e);
/* Report back any failures, for diagnostic purposes */
try {
- umbilical.reportError(childId, null, ExceptionUtils.getStackTrace(e));
+ umbilical.reportError(childId, null, ExceptionUtils.getStackTrace(e), logFileInfo);
} catch (Exception ex) {
logger.debug("Fail to log", ex);
}
@@ -1426,6 +1428,8 @@ public class StreamingContainer extends YarnContainerMain
node.run(); /* this is a blocking call */
} catch (Error error) {
int[] operators;
+ //fetch logFileInfo before logging exception, to get offset before exception
+ LogFileInformation logFileInfo = LoggerUtil.getLogFileInformation();
if (currentdi == null) {
logger.error("Voluntary container termination due to an error in operator set {}.", setOperators, error);
operators = new int[setOperators.size()];
@@ -1438,19 +1442,21 @@ public class StreamingContainer extends YarnContainerMain
operators = new int[]{currentdi.id};
}
try {
- umbilical.reportError(containerId, operators, "Voluntary container termination due to an error. " + ExceptionUtils.getStackTrace(error));
+ umbilical.reportError(containerId, operators, "Voluntary container termination due to an error. " + ExceptionUtils.getStackTrace(error), logFileInfo);
} catch (Exception e) {
logger.debug("Fail to log", e);
} finally {
System.exit(1);
}
} catch (Exception ex) {
+ //fetch logFileInfo before logging exception, to get offset before exception
+ LogFileInformation logFileInfo = LoggerUtil.getLogFileInformation();
if (currentdi == null) {
failedNodes.add(ndi.id);
logger.error("Operator set {} stopped running due to an exception.", setOperators, ex);
int[] operators = new int[]{ndi.id};
try {
- umbilical.reportError(containerId, operators, "Stopped running due to an exception. " + ExceptionUtils.getStackTrace(ex));
+ umbilical.reportError(containerId, operators, "Stopped running due to an exception. " + ExceptionUtils.getStackTrace(ex), logFileInfo);
} catch (Exception e) {
logger.debug("Fail to log", e);
}
@@ -1459,7 +1465,7 @@ public class StreamingContainer extends YarnContainerMain
logger.error("Abandoning deployment of operator {} due to setup failure.", currentdi, ex);
int[] operators = new int[]{currentdi.id};
try {
- umbilical.reportError(containerId, operators, "Abandoning deployment due to setup failure. " + ExceptionUtils.getStackTrace(ex));
+ umbilical.reportError(containerId, operators, "Abandoning deployment due to setup failure. " + ExceptionUtils.getStackTrace(ex), logFileInfo);
} catch (Exception e) {
logger.debug("Fail to log", e);
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/764ca7b2/engine/src/main/java/com/datatorrent/stram/util/LoggerUtil.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/util/LoggerUtil.java b/engine/src/main/java/com/datatorrent/stram/util/LoggerUtil.java
index d30958c..ffe9c8c 100644
--- a/engine/src/main/java/com/datatorrent/stram/util/LoggerUtil.java
+++ b/engine/src/main/java/com/datatorrent/stram/util/LoggerUtil.java
@@ -18,6 +18,7 @@
*/
package com.datatorrent.stram.util;
+import java.io.File;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.Map;
@@ -26,8 +27,11 @@ import java.util.regex.Pattern;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import org.apache.apex.log.LogFileInformation;
+
import org.apache.log4j.Appender;
import org.apache.log4j.Category;
+import org.apache.log4j.FileAppender;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -36,6 +40,7 @@ import org.apache.log4j.spi.HierarchyEventListener;
import org.apache.log4j.spi.LoggerFactory;
import org.apache.log4j.spi.LoggerRepository;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
@@ -182,9 +187,19 @@ public class LoggerUtil
}
}
+ private static FileAppender fileAppender;
+ private static boolean shouldFetchLogFileInfo;
static {
logger.debug("initializing LoggerUtil");
+ initializeLogger();
+ }
+
+ @VisibleForTesting
+ static void initializeLogger()
+ {
LogManager.setRepositorySelector(new DefaultRepositorySelector(new DelegatingLoggerRepository(LogManager.getLoggerRepository())), null);
+ fileAppender = getFileAppender();
+ shouldFetchLogFileInfo = shouldFetchLogFileInformation();
}
private static synchronized Level getLevelFor(String name)
@@ -278,4 +293,66 @@ public class LoggerUtil
}
return ImmutableMap.copyOf(matchedClasses);
}
+
+ /**
+ * Returns logger log file {@link LogFileInformation}
+ * @return logFileInformation
+ */
+ public static LogFileInformation getLogFileInformation()
+ {
+ if (shouldFetchLogFileInfo) {
+ File logFile = new File(fileAppender.getFile());
+ LogFileInformation logFileInfo = new LogFileInformation(fileAppender.getFile(), logFile.length());
+ return logFileInfo;
+ }
+ return null;
+ }
+
+ private static FileAppender getFileAppender()
+ {
+ Enumeration<Appender> e = LogManager.getRootLogger().getAllAppenders();
+ FileAppender fileAppender = null;
+ while (e.hasMoreElements()) {
+ Appender appender = e.nextElement();
+ if (appender instanceof FileAppender) {
+ if (fileAppender == null) {
+ fileAppender = (FileAppender)appender;
+ } else {
+ //skip fetching log file information if we have multiple file Appenders
+ return null;
+ }
+ }
+ }
+ return fileAppender;
+ }
+
+ /*
+ * We should return log file information only if,
+ * we have single file Appender, the logging level of appender is set to level Error or above and immediateFlush is set to true.
+ * In future we should be able to enhance this feature to support multiple file appenders.
+ */
+ private static boolean shouldFetchLogFileInformation()
+ {
+ if (fileAppender != null && isErrorLevelEnable() && fileAppender.getImmediateFlush()) {
+ return true;
+ }
+ logger.warn(
+ "Log information is unavailable. To enable log information log4j/logging should be configured with single FileAppender that has immediateFlush set to true and log level set to ERROR or greater.");
+ return false;
+ }
+
+ private static boolean isErrorLevelEnable()
+ {
+ if (fileAppender != null) {
+ Level p = (Level)fileAppender.getThreshold();
+ if (p == null) {
+ p = LogManager.getRootLogger().getLevel();
+ }
+ if (p != null) {
+ return Level.ERROR.isGreaterOrEqual(p);
+ }
+ }
+ return false;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/764ca7b2/engine/src/main/java/org/apache/apex/log/LogFileInformation.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/log/LogFileInformation.java b/engine/src/main/java/org/apache/apex/log/LogFileInformation.java
new file mode 100644
index 0000000..841d847
--- /dev/null
+++ b/engine/src/main/java/org/apache/apex/log/LogFileInformation.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.log;
+
+import com.datatorrent.stram.util.AbstractWritableAdapter;
+
+public class LogFileInformation extends AbstractWritableAdapter
+{
+
+ private static final long serialVersionUID = 1L;
+ public String fileName;
+ public long fileOffset;
+
+ public LogFileInformation()
+ {
+ }
+
+ public LogFileInformation(String fileName, long fileOffset)
+ {
+ this.fileName = fileName;
+ this.fileOffset = fileOffset;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "LogFileInformation [fileName=" + fileName + ", fileOffset=" + fileOffset + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/764ca7b2/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
index e8ec26c..8a44508 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
@@ -494,7 +494,7 @@ public class StramRecoveryTest
}
};
Mockito.doAnswer(answer).when(impl).log("containerId", "timeout");
- Mockito.doAnswer(answer).when(impl).reportError("containerId", null, "timeout");
+ Mockito.doAnswer(answer).when(impl).reportError("containerId", null, "timeout", null);
Server server = new RPC.Builder(conf).setProtocol(StreamingContainerUmbilicalProtocol.class).setInstance(impl)
.setBindAddress("0.0.0.0").setPort(0).setNumHandlers(1).setVerbose(false).build();
@@ -548,7 +548,7 @@ public class StramRecoveryTest
rp = new RecoverableRpcProxy(appPath, conf);
protocolProxy = rp.getProxy();
- protocolProxy.reportError("containerId", null, "msg");
+ protocolProxy.reportError("containerId", null, "msg", null);
try {
protocolProxy.log("containerId", "timeout");
Assert.fail("expected socket timeout");
@@ -564,7 +564,7 @@ public class StramRecoveryTest
uri = RecoverableRpcProxy.toConnectURI(address);
recoveryHandler.writeConnectUri(uri.toString());
- protocolProxy.reportError("containerId", null, "timeout");
+ protocolProxy.reportError("containerId", null, "timeout", null);
Assert.assertTrue("timedout", timedout.get());
restoreSystemProperty(RecoverableRpcProxy.RPC_TIMEOUT, rpcTimeout);
http://git-wip-us.apache.org/repos/asf/apex-core/blob/764ca7b2/engine/src/test/java/com/datatorrent/stram/util/LogFileInformationTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/util/LogFileInformationTest.java b/engine/src/test/java/com/datatorrent/stram/util/LogFileInformationTest.java
new file mode 100644
index 0000000..32c56c4
--- /dev/null
+++ b/engine/src/test/java/com/datatorrent/stram/util/LogFileInformationTest.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram.util;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.apache.log4j.RollingFileAppender;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class LogFileInformationTest
+{
+ private static final String APPENDER_NAME = "rfa";
+ private static final org.slf4j.Logger logger = LoggerFactory.getLogger(LogFileInformationTest.class);
+ private static String logFileName;
+
+ @BeforeClass
+ public static void beforeClass() throws IOException
+ {
+ String logFileDir = new File("target" + File.separator + "logDir").getAbsolutePath();
+ logFileName = logFileDir + File.separator + "appTest.log";
+ RollingFileAppender rfa = new RollingFileAppender(new PatternLayout("%d{ISO8601} [%t] %-5p %c{2} %M - %m%n"),
+ logFileName);
+ rfa.setName(APPENDER_NAME);
+ Logger.getRootLogger().addAppender(rfa);
+ }
+
+ @Before
+ public void setup()
+ {
+ LoggerUtil.initializeLogger();
+ }
+
+ @Test
+ public void testGetLogFileInformation()
+ {
+ long currentLogFileSize = LoggerUtil.getLogFileInformation().fileOffset;
+ logger.info("Adding Test log message.");
+ assertEquals(logFileName, LoggerUtil.getLogFileInformation().fileName);
+ assertTrue(LoggerUtil.getLogFileInformation().fileOffset > currentLogFileSize);
+ }
+
+ @Test
+ public void testImmediateFlushOff()
+ {
+ RollingFileAppender rfa = (RollingFileAppender)Logger.getRootLogger().getAppender(APPENDER_NAME);
+ rfa.setImmediateFlush(false);
+ Logger.getRootLogger().addAppender(rfa);
+ LoggerUtil.initializeLogger();
+
+ Assert.assertNull(LoggerUtil.getLogFileInformation());
+ rfa.setImmediateFlush(true);
+ }
+
+ @Test
+ public void testErrorLevelOff()
+ {
+ Level curLogLevel = Logger.getRootLogger().getLevel();
+ Logger.getRootLogger().setLevel(Level.FATAL);
+ LoggerUtil.initializeLogger();
+
+ Assert.assertNull(LoggerUtil.getLogFileInformation());
+ Logger.getRootLogger().setLevel(curLogLevel);
+ }
+
+ @Test
+ public void testNoFileAppender()
+ {
+ RollingFileAppender rfa = (RollingFileAppender)Logger.getRootLogger().getAppender(APPENDER_NAME);
+ Logger.getRootLogger().removeAppender(APPENDER_NAME);
+ LoggerUtil.initializeLogger();
+ Assert.assertNull(LoggerUtil.getLogFileInformation());
+ Logger.getRootLogger().addAppender(rfa);
+ }
+
+ @AfterClass
+ public static void tearDown()
+ {
+ Logger.getRootLogger().removeAppender(APPENDER_NAME);
+ FileUtils.deleteQuietly(new File(logFileName).getParentFile());
+ }
+}
[2/2] apex-core git commit: Merge branch
'APEXCORE-563-event-log-update' of
http://github.com/DT-Priyanka/incubator-apex-core into APEXCORE-563
Posted by vr...@apache.org.
Merge branch 'APEXCORE-563-event-log-update' of http://github.com/DT-Priyanka/incubator-apex-core into APEXCORE-563
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/8ce340ef
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/8ce340ef
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/8ce340ef
Branch: refs/heads/master
Commit: 8ce340ef3e363d188a58ae496d51cec170a58947
Parents: 1e47856 764ca7b
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Fri Mar 3 08:33:27 2017 -0800
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Fri Mar 3 08:33:27 2017 -0800
----------------------------------------------------------------------
.../datatorrent/stram/StramLocalCluster.java | 5 +-
.../stram/StreamingContainerParent.java | 9 +-
.../com/datatorrent/stram/api/StramEvent.java | 58 ++++++++--
.../StreamingContainerUmbilicalProtocol.java | 5 +-
.../stram/engine/StreamingContainer.java | 14 ++-
.../com/datatorrent/stram/util/LoggerUtil.java | 77 +++++++++++++
.../org/apache/apex/log/LogFileInformation.java | 45 ++++++++
.../datatorrent/stram/StramRecoveryTest.java | 6 +-
.../stram/util/LogFileInformationTest.java | 111 +++++++++++++++++++
9 files changed, 308 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/8ce340ef/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/8ce340ef/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
----------------------------------------------------------------------