You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2017/03/03 16:40:11 UTC

[1/2] apex-core git commit: APEXCORE-563: Add log filename and offset to the container/operator error events

Repository: apex-core
Updated Branches:
  refs/heads/master 1e4785671 -> 8ce340ef3


APEXCORE-563: Add log filename and offset to the container/operator error events


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/764ca7b2
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/764ca7b2
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/764ca7b2

Branch: refs/heads/master
Commit: 764ca7b285d5af4305b5c792cc1d9154da2f0c0d
Parents: a6dd73b
Author: Priyanka Gugale <pr...@apache.org>
Authored: Tue Dec 13 17:22:47 2016 +0530
Committer: priya <pr...@apache.org>
Committed: Fri Mar 3 10:33:57 2017 +0530

----------------------------------------------------------------------
 .../datatorrent/stram/StramLocalCluster.java    |   5 +-
 .../stram/StreamingContainerParent.java         |   9 +-
 .../com/datatorrent/stram/api/StramEvent.java   |  58 ++++++++--
 .../StreamingContainerUmbilicalProtocol.java    |   5 +-
 .../stram/engine/StreamingContainer.java        |  14 ++-
 .../com/datatorrent/stram/util/LoggerUtil.java  |  77 +++++++++++++
 .../org/apache/apex/log/LogFileInformation.java |  45 ++++++++
 .../datatorrent/stram/StramRecoveryTest.java    |   6 +-
 .../stram/util/LogFileInformationTest.java      | 111 +++++++++++++++++++
 9 files changed, 308 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/764ca7b2/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
index 4d452af..7ebeea6 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
@@ -33,6 +33,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import org.apache.apex.log.LogFileInformation;
+
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
@@ -105,7 +108,7 @@ public class StramLocalCluster implements Runnable, Controller
     }
 
     @Override
-    public void reportError(String containerId, int[] operators, String msg) throws IOException
+    public void reportError(String containerId, int[] operators, String msg, LogFileInformation logFileInfo) throws IOException
     {
       log(containerId, msg);
     }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/764ca7b2/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java
index e5d8a97..76f89bd 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java
@@ -24,6 +24,8 @@ import java.net.InetSocketAddress;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.log.LogFileInformation;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.ipc.ProtocolSignature;
@@ -172,15 +174,16 @@ public class StreamingContainerParent extends org.apache.hadoop.service.Composit
   }
 
   @Override
-  public void reportError(String containerId, int[] operators, String msg) throws IOException
+  public void reportError(String containerId, int[] operators, String msg, LogFileInformation logFileInfo) throws IOException
   {
     if (operators == null || operators.length == 0) {
-      dagManager.recordEventAsync(new ContainerErrorEvent(containerId, msg));
+      dagManager.recordEventAsync(new ContainerErrorEvent(containerId, msg, logFileInfo));
     } else {
       for (int operator : operators) {
         OperatorInfo operatorInfo = dagManager.getOperatorInfo(operator);
         if (operatorInfo != null) {
-          dagManager.recordEventAsync(new OperatorErrorEvent(operatorInfo.name, operator, containerId, msg));
+          dagManager.recordEventAsync(new OperatorErrorEvent(operatorInfo.name, operator, containerId, msg,
+              logFileInfo));
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/764ca7b2/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java b/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java
index e9c5f13..d80fa94 100644
--- a/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java
+++ b/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java
@@ -20,6 +20,8 @@ package com.datatorrent.stram.api;
 
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.apex.log.LogFileInformation;
+
 import com.datatorrent.stram.plan.logical.requests.LogicalPlanRequest;
 
 /**
@@ -35,13 +37,20 @@ public abstract class StramEvent
   private long timestamp = System.currentTimeMillis();
   private String reason;
   private LogLevel logLevel;
+  private LogFileInformation logFileInformation;
 
   public abstract String getType();
 
   protected StramEvent(LogLevel logLevel)
   {
+    this(logLevel, null);
+  }
+
+  protected StramEvent(LogLevel logLevel, LogFileInformation logFileInformation)
+  {
     id = nextId.getAndIncrement();
     this.logLevel = logLevel;
+    this.logFileInformation = logFileInformation;
   }
 
   public long getId()
@@ -74,6 +83,21 @@ public abstract class StramEvent
     return logLevel;
   }
 
+  public String getLogFileName()
+  {
+    return logFileInformation != null ? logFileInformation.fileName : null;
+  }
+
+  public long getLogFileOffset()
+  {
+    return logFileInformation != null ? logFileInformation.fileOffset : 0;
+  }
+
+  public void setLogFileInfomation(LogFileInformation logFileInformation)
+  {
+    this.logFileInformation = logFileInformation;
+  }
+
   public static enum LogLevel
   {
     TRACE,
@@ -90,7 +114,12 @@ public abstract class StramEvent
 
     public OperatorEvent(String operatorName, LogLevel logLevel)
     {
-      super(logLevel);
+      this(operatorName, logLevel, null);
+    }
+
+    public OperatorEvent(String operatorName, LogLevel logLevel, LogFileInformation logFileInformation)
+    {
+      super(logLevel, logFileInformation);
       this.operatorName = operatorName;
     }
 
@@ -202,7 +231,13 @@ public abstract class StramEvent
 
     public PhysicalOperatorEvent(String operatorName, int operatorId, LogLevel logLevel)
     {
-      super(operatorName, logLevel);
+      this(operatorName, operatorId, logLevel, null);
+    }
+
+    public PhysicalOperatorEvent(String operatorName, int operatorId, LogLevel logLevel,
+        LogFileInformation logFileInformation)
+    {
+      super(operatorName, logLevel, logFileInformation);
       this.operatorId = operatorId;
     }
 
@@ -492,14 +527,16 @@ public abstract class StramEvent
     private String containerId;
     private String errorMessage;
 
-    public OperatorErrorEvent(String operatorName, int operatorId, String containerId, String errorMessage)
+    public OperatorErrorEvent(String operatorName, int operatorId, String containerId, String errorMessage,
+        LogFileInformation logFileInformation)
     {
-      this(operatorName, operatorId, containerId, errorMessage, LogLevel.ERROR);
+      this(operatorName, operatorId, containerId, errorMessage, logFileInformation, LogLevel.ERROR);
     }
 
-    public OperatorErrorEvent(String operatorName, int operatorId, String containerId, String errorMessage, LogLevel logLevel)
+    public OperatorErrorEvent(String operatorName, int operatorId, String containerId, String errorMessage,
+        LogFileInformation logFileInformation, LogLevel logLevel)
     {
-      super(operatorName, operatorId, logLevel);
+      super(operatorName, operatorId, logLevel, logFileInformation);
       this.containerId = containerId;
       this.errorMessage = errorMessage;
     }
@@ -537,14 +574,15 @@ public abstract class StramEvent
     private String containerId;
     private String errorMessage;
 
-    public ContainerErrorEvent(String containerId, String errorMessage)
+    public ContainerErrorEvent(String containerId, String errorMessage, LogFileInformation logFileInformation)
     {
-      this(containerId, errorMessage, LogLevel.ERROR);
+      this(containerId, errorMessage, logFileInformation, LogLevel.ERROR);
     }
 
-    public ContainerErrorEvent(String containerId, String errorMessage, LogLevel logLevel)
+    public ContainerErrorEvent(String containerId, String errorMessage, LogFileInformation logFileInformation,
+        LogLevel logLevel)
     {
-      super(logLevel);
+      super(logLevel, logFileInformation);
       this.containerId = containerId;
       this.errorMessage = errorMessage;
     }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/764ca7b2/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java b/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java
index 13832ba..b78e8f2 100644
--- a/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java
+++ b/engine/src/main/java/com/datatorrent/stram/api/StreamingContainerUmbilicalProtocol.java
@@ -23,6 +23,8 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.apex.log.LogFileInformation;
+
 import org.apache.commons.lang.builder.ToStringBuilder;
 import org.apache.commons.lang.builder.ToStringStyle;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -430,8 +432,9 @@ public interface StreamingContainerUmbilicalProtocol extends VersionedProtocol
    * @param containerId
    * @param operators
    * @param msg
+   * @param logFileInfo
    */
-  void reportError(String containerId, int[] operators, String msg) throws IOException;
+  void reportError(String containerId, int[] operators, String msg, LogFileInformation logFileInfo) throws IOException;
 
   /**
    * To be called periodically by child for heartbeat protocol.

http://git-wip-us.apache.org/repos/asf/apex-core/blob/764ca7b2/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
index 38963c4..437070c 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
@@ -42,6 +42,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.log.LogFileInformation;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -315,10 +316,11 @@ public class StreamingContainer extends YarnContainerMain
         stramChild.teardown();
       }
     } catch (Error | Exception e) {
+      LogFileInformation logFileInfo = LoggerUtil.getLogFileInformation();
       logger.error("Fatal {} in container!", (e instanceof Error) ? "Error" : "Exception", e);
       /* Report back any failures, for diagnostic purposes */
       try {
-        umbilical.reportError(childId, null, ExceptionUtils.getStackTrace(e));
+        umbilical.reportError(childId, null, ExceptionUtils.getStackTrace(e), logFileInfo);
       } catch (Exception ex) {
         logger.debug("Fail to log", ex);
       }
@@ -1426,6 +1428,8 @@ public class StreamingContainer extends YarnContainerMain
             node.run(); /* this is a blocking call */
           } catch (Error error) {
             int[] operators;
+            //fetch logFileInfo before logging exception, to get offset before exception
+            LogFileInformation logFileInfo = LoggerUtil.getLogFileInformation();
             if (currentdi == null) {
               logger.error("Voluntary container termination due to an error in operator set {}.", setOperators, error);
               operators = new int[setOperators.size()];
@@ -1438,19 +1442,21 @@ public class StreamingContainer extends YarnContainerMain
               operators = new int[]{currentdi.id};
             }
             try {
-              umbilical.reportError(containerId, operators, "Voluntary container termination due to an error. " + ExceptionUtils.getStackTrace(error));
+              umbilical.reportError(containerId, operators, "Voluntary container termination due to an error. " + ExceptionUtils.getStackTrace(error), logFileInfo);
             } catch (Exception e) {
               logger.debug("Fail to log", e);
             } finally {
               System.exit(1);
             }
           } catch (Exception ex) {
+            //fetch logFileInfo before logging exception, to get offset before exception
+            LogFileInformation logFileInfo = LoggerUtil.getLogFileInformation();
             if (currentdi == null) {
               failedNodes.add(ndi.id);
               logger.error("Operator set {} stopped running due to an exception.", setOperators, ex);
               int[] operators = new int[]{ndi.id};
               try {
-                umbilical.reportError(containerId, operators, "Stopped running due to an exception. " + ExceptionUtils.getStackTrace(ex));
+                umbilical.reportError(containerId, operators, "Stopped running due to an exception. " + ExceptionUtils.getStackTrace(ex), logFileInfo);
               } catch (Exception e) {
                 logger.debug("Fail to log", e);
               }
@@ -1459,7 +1465,7 @@ public class StreamingContainer extends YarnContainerMain
               logger.error("Abandoning deployment of operator {} due to setup failure.", currentdi, ex);
               int[] operators = new int[]{currentdi.id};
               try {
-                umbilical.reportError(containerId, operators, "Abandoning deployment due to setup failure. " + ExceptionUtils.getStackTrace(ex));
+                umbilical.reportError(containerId, operators, "Abandoning deployment due to setup failure. " + ExceptionUtils.getStackTrace(ex), logFileInfo);
               } catch (Exception e) {
                 logger.debug("Fail to log", e);
               }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/764ca7b2/engine/src/main/java/com/datatorrent/stram/util/LoggerUtil.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/util/LoggerUtil.java b/engine/src/main/java/com/datatorrent/stram/util/LoggerUtil.java
index d30958c..ffe9c8c 100644
--- a/engine/src/main/java/com/datatorrent/stram/util/LoggerUtil.java
+++ b/engine/src/main/java/com/datatorrent/stram/util/LoggerUtil.java
@@ -18,6 +18,7 @@
  */
 package com.datatorrent.stram.util;
 
+import java.io.File;
 import java.util.Enumeration;
 import java.util.Iterator;
 import java.util.Map;
@@ -26,8 +27,11 @@ import java.util.regex.Pattern;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import org.apache.apex.log.LogFileInformation;
+
 import org.apache.log4j.Appender;
 import org.apache.log4j.Category;
+import org.apache.log4j.FileAppender;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -36,6 +40,7 @@ import org.apache.log4j.spi.HierarchyEventListener;
 import org.apache.log4j.spi.LoggerFactory;
 import org.apache.log4j.spi.LoggerRepository;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
@@ -182,9 +187,19 @@ public class LoggerUtil
     }
   }
 
+  private static FileAppender fileAppender;
+  private static boolean shouldFetchLogFileInfo;
   static {
     logger.debug("initializing LoggerUtil");
+    initializeLogger();
+  }
+
+  @VisibleForTesting
+  static void initializeLogger()
+  {
     LogManager.setRepositorySelector(new DefaultRepositorySelector(new DelegatingLoggerRepository(LogManager.getLoggerRepository())), null);
+    fileAppender = getFileAppender();
+    shouldFetchLogFileInfo = shouldFetchLogFileInformation();
   }
 
   private static synchronized Level getLevelFor(String name)
@@ -278,4 +293,66 @@ public class LoggerUtil
     }
     return ImmutableMap.copyOf(matchedClasses);
   }
+
+  /**
+   * Returns logger log file {@link LogFileInformation}
+   * @return logFileInformation
+   */
+  public static LogFileInformation getLogFileInformation()
+  {
+    if (shouldFetchLogFileInfo) {
+      File logFile = new File(fileAppender.getFile());
+      LogFileInformation logFileInfo = new LogFileInformation(fileAppender.getFile(), logFile.length());
+      return logFileInfo;
+    }
+    return null;
+  }
+
+  private static FileAppender getFileAppender()
+  {
+    Enumeration<Appender> e = LogManager.getRootLogger().getAllAppenders();
+    FileAppender fileAppender = null;
+    while (e.hasMoreElements()) {
+      Appender appender = e.nextElement();
+      if (appender instanceof FileAppender) {
+        if (fileAppender == null) {
+          fileAppender = (FileAppender)appender;
+        } else {
+          //skip fetching log file information if we have multiple file Appenders
+          return null;
+        }
+      }
+    }
+    return fileAppender;
+  }
+
+  /*
+   * We should return log file information only if,
+   * we have single file Appender, the logging level of appender is set to level Error or above and immediateFlush is set to true.
+   * In future we should be able to enhance this feature to support multiple file appenders.
+   */
+  private static boolean shouldFetchLogFileInformation()
+  {
+    if (fileAppender != null && isErrorLevelEnable() && fileAppender.getImmediateFlush()) {
+      return true;
+    }
+    logger.warn(
+        "Log information is unavailable. To enable log information log4j/logging should be configured with single FileAppender that has immediateFlush set to true and log level set to ERROR or greater.");
+    return false;
+  }
+
+  private static boolean isErrorLevelEnable()
+  {
+    if (fileAppender != null) {
+      Level p = (Level)fileAppender.getThreshold();
+      if (p == null) {
+        p = LogManager.getRootLogger().getLevel();
+      }
+      if (p != null) {
+        return Level.ERROR.isGreaterOrEqual(p);
+      }
+    }
+    return false;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/764ca7b2/engine/src/main/java/org/apache/apex/log/LogFileInformation.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/log/LogFileInformation.java b/engine/src/main/java/org/apache/apex/log/LogFileInformation.java
new file mode 100644
index 0000000..841d847
--- /dev/null
+++ b/engine/src/main/java/org/apache/apex/log/LogFileInformation.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.log;
+
+import com.datatorrent.stram.util.AbstractWritableAdapter;
+
+public class LogFileInformation extends AbstractWritableAdapter
+{
+
+  private static final long serialVersionUID = 1L;
+  public String fileName;
+  public long fileOffset;
+
+  public LogFileInformation()
+  {
+  }
+
+  public LogFileInformation(String fileName, long fileOffset)
+  {
+    this.fileName = fileName;
+    this.fileOffset = fileOffset;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "LogFileInformation [fileName=" + fileName + ", fileOffset=" + fileOffset + "]";
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/764ca7b2/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
index e8ec26c..8a44508 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
@@ -494,7 +494,7 @@ public class StramRecoveryTest
       }
     };
     Mockito.doAnswer(answer).when(impl).log("containerId", "timeout");
-    Mockito.doAnswer(answer).when(impl).reportError("containerId", null, "timeout");
+    Mockito.doAnswer(answer).when(impl).reportError("containerId", null, "timeout", null);
 
     Server server = new RPC.Builder(conf).setProtocol(StreamingContainerUmbilicalProtocol.class).setInstance(impl)
         .setBindAddress("0.0.0.0").setPort(0).setNumHandlers(1).setVerbose(false).build();
@@ -548,7 +548,7 @@ public class StramRecoveryTest
 
     rp = new RecoverableRpcProxy(appPath, conf);
     protocolProxy = rp.getProxy();
-    protocolProxy.reportError("containerId", null, "msg");
+    protocolProxy.reportError("containerId", null, "msg", null);
     try {
       protocolProxy.log("containerId", "timeout");
       Assert.fail("expected socket timeout");
@@ -564,7 +564,7 @@ public class StramRecoveryTest
     uri = RecoverableRpcProxy.toConnectURI(address);
     recoveryHandler.writeConnectUri(uri.toString());
 
-    protocolProxy.reportError("containerId", null, "timeout");
+    protocolProxy.reportError("containerId", null, "timeout", null);
     Assert.assertTrue("timedout", timedout.get());
 
     restoreSystemProperty(RecoverableRpcProxy.RPC_TIMEOUT, rpcTimeout);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/764ca7b2/engine/src/test/java/com/datatorrent/stram/util/LogFileInformationTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/util/LogFileInformationTest.java b/engine/src/test/java/com/datatorrent/stram/util/LogFileInformationTest.java
new file mode 100644
index 0000000..32c56c4
--- /dev/null
+++ b/engine/src/test/java/com/datatorrent/stram/util/LogFileInformationTest.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram.util;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.apache.log4j.RollingFileAppender;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class LogFileInformationTest
+{
+  private static final String APPENDER_NAME = "rfa";
+  private static final org.slf4j.Logger logger = LoggerFactory.getLogger(LogFileInformationTest.class);
+  private static String logFileName;
+
+  @BeforeClass
+  public static void beforeClass() throws IOException
+  {
+    String logFileDir = new File("target" + File.separator + "logDir").getAbsolutePath();
+    logFileName = logFileDir + File.separator + "appTest.log";
+    RollingFileAppender rfa = new RollingFileAppender(new PatternLayout("%d{ISO8601} [%t] %-5p %c{2} %M - %m%n"),
+        logFileName);
+    rfa.setName(APPENDER_NAME);
+    Logger.getRootLogger().addAppender(rfa);
+  }
+
+  @Before
+  public void setup()
+  {
+    LoggerUtil.initializeLogger();
+  }
+
+  @Test
+  public void testGetLogFileInformation()
+  {
+    long currentLogFileSize = LoggerUtil.getLogFileInformation().fileOffset;
+    logger.info("Adding Test log message.");
+    assertEquals(logFileName, LoggerUtil.getLogFileInformation().fileName);
+    assertTrue(LoggerUtil.getLogFileInformation().fileOffset > currentLogFileSize);
+  }
+
+  @Test
+  public void testImmediateFlushOff()
+  {
+    RollingFileAppender rfa = (RollingFileAppender)Logger.getRootLogger().getAppender(APPENDER_NAME);
+    rfa.setImmediateFlush(false);
+    Logger.getRootLogger().addAppender(rfa);
+    LoggerUtil.initializeLogger();
+
+    Assert.assertNull(LoggerUtil.getLogFileInformation());
+    rfa.setImmediateFlush(true);
+  }
+
+  @Test
+  public void testErrorLevelOff()
+  {
+    Level curLogLevel = Logger.getRootLogger().getLevel();
+    Logger.getRootLogger().setLevel(Level.FATAL);
+    LoggerUtil.initializeLogger();
+
+    Assert.assertNull(LoggerUtil.getLogFileInformation());
+    Logger.getRootLogger().setLevel(curLogLevel);
+  }
+
+  @Test
+  public void testNoFileAppender()
+  {
+    RollingFileAppender rfa = (RollingFileAppender)Logger.getRootLogger().getAppender(APPENDER_NAME);
+    Logger.getRootLogger().removeAppender(APPENDER_NAME);
+    LoggerUtil.initializeLogger();
+    Assert.assertNull(LoggerUtil.getLogFileInformation());
+    Logger.getRootLogger().addAppender(rfa);
+  }
+
+  @AfterClass
+  public static void tearDown()
+  {
+    Logger.getRootLogger().removeAppender(APPENDER_NAME);
+    FileUtils.deleteQuietly(new File(logFileName).getParentFile());
+  }
+}


[2/2] apex-core git commit: Merge branch 'APEXCORE-563-event-log-update' of http://github.com/DT-Priyanka/incubator-apex-core into APEXCORE-563

Posted by vr...@apache.org.
Merge branch 'APEXCORE-563-event-log-update' of http://github.com/DT-Priyanka/incubator-apex-core into APEXCORE-563


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/8ce340ef
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/8ce340ef
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/8ce340ef

Branch: refs/heads/master
Commit: 8ce340ef3e363d188a58ae496d51cec170a58947
Parents: 1e47856 764ca7b
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Fri Mar 3 08:33:27 2017 -0800
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Fri Mar 3 08:33:27 2017 -0800

----------------------------------------------------------------------
 .../datatorrent/stram/StramLocalCluster.java    |   5 +-
 .../stram/StreamingContainerParent.java         |   9 +-
 .../com/datatorrent/stram/api/StramEvent.java   |  58 ++++++++--
 .../StreamingContainerUmbilicalProtocol.java    |   5 +-
 .../stram/engine/StreamingContainer.java        |  14 ++-
 .../com/datatorrent/stram/util/LoggerUtil.java  |  77 +++++++++++++
 .../org/apache/apex/log/LogFileInformation.java |  45 ++++++++
 .../datatorrent/stram/StramRecoveryTest.java    |   6 +-
 .../stram/util/LogFileInformationTest.java      | 111 +++++++++++++++++++
 9 files changed, 308 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/8ce340ef/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/apex-core/blob/8ce340ef/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
----------------------------------------------------------------------