You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2014/01/16 02:56:40 UTC

git commit: [HELIX-356] add a tool for grep zk transaction/snapshot logs based on time, rb=16935

Updated Branches:
  refs/heads/master 2981d6aa8 -> 027eea291


[HELIX-356] add a tool for grep zk transaction/snapshot logs based on time,rb=16935


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/027eea29
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/027eea29
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/027eea29

Branch: refs/heads/master
Commit: 027eea291e86335063ca3bfa923515c3511f88d9
Parents: 2981d6a
Author: zzhang <zz...@uci.edu>
Authored: Wed Jan 15 17:56:27 2014 -0800
Committer: zzhang <zz...@uci.edu>
Committed: Wed Jan 15 17:56:27 2014 -0800

----------------------------------------------------------------------
 helix-core/pom.xml                              |   8 +-
 .../java/org/apache/helix/tools/ZkGrep.java     | 641 +++++++++++++++++++
 .../org/apache/helix/tools/ZkLogAnalyzer.java   | 444 -------------
 3 files changed, 645 insertions(+), 448 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/027eea29/helix-core/pom.xml
----------------------------------------------------------------------
diff --git a/helix-core/pom.xml b/helix-core/pom.xml
index f30abac..3bd9812 100644
--- a/helix-core/pom.xml
+++ b/helix-core/pom.xml
@@ -201,10 +201,6 @@ under the License.
               <name>start-standalone-zookeeper</name>
             </program>
             <program>
-              <mainClass>org.apache.helix.tools.ZkLogAnalyzer</mainClass>
-              <name>zk-log-analyzer</name>
-            </program>
-            <program>
               <mainClass>org.apache.helix.tools.JmxDumper</mainClass>
               <name>JmxDumper</name>
             </program>
@@ -216,6 +212,10 @@ under the License.
               <mainClass>org.apache.helix.tools.IntegrationTestUtil</mainClass>
               <name>test-util</name>
             </program>
+            <program>
+              <mainClass>org.apache.helix.tools.ZkGrep</mainClass>
+              <name>zkgrep</name>
+            </program>
           </programs>
         </configuration>
       </plugin>

http://git-wip-us.apache.org/repos/asf/helix/blob/027eea29/helix-core/src/main/java/org/apache/helix/tools/ZkGrep.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ZkGrep.java b/helix-core/src/main/java/org/apache/helix/tools/ZkGrep.java
new file mode 100644
index 0000000..3abf78f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/ZkGrep.java
@@ -0,0 +1,641 @@
+package org.apache.helix.tools;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * utility for grep zk transaction/snapshot logs
+ * - to grep a pattern by t1 use:
+ * zkgrep --zkCfg zkCfg --by t1 --pattern patterns...
+ * - to grep a pattern between t1 and t2 use:
+ * zkgrep --zkCfg zkCfg --between t1 t2 --pattern patterns...
+ * for example, to find fail-over latency between t1 and t2, use:
+ * 1) zkgrep --zkCfg zkCfg --by t1 --pattern "/{cluster}/LIVEINSTNCES/" | grep {fail-node}
+ * 2) zkgrep --zkCfg zkCfg --between t1 t2 --pattern "closeSession" | grep {fail-node session-id}
+ * 3) zkgrep --zkCfg zkCfg --between t1 t2 --pattern "/{cluster}" | grep "CURRENTSTATES" |
+ * grep "setData" | tail -1
+ * fail-over latency = timestamp difference between 2) and 3)
+ */
+public class ZkGrep {
+  private static Logger LOG = Logger.getLogger(ZkGrep.class);
+
+  private static final String zkCfg = "zkCfg";
+  private static final String pattern = "pattern";
+  private static final String by = "by";
+  private static final String between = "between";
+
+  public static final String log = "log";
+  public static final String snapshot = "snapshot";
+
+  private static final String gzSuffix = ".gz";
+
+  @SuppressWarnings("static-access")
+  private static Options constructCommandLineOptions() {
+    Option zkCfgOption =
+        OptionBuilder.hasArgs(1).isRequired(false).withLongOpt(zkCfg).withArgName("zoo.cfg")
+            .withDescription("provide zoo.cfg").create();
+
+    Option patternOption =
+        OptionBuilder.hasArgs().isRequired(true).withLongOpt(pattern)
+            .withArgName("grep-patterns...").withDescription("provide patterns (required)")
+            .create();
+
+    Option betweenOption =
+        OptionBuilder.hasArgs(2).isRequired(false).withLongOpt(between)
+            .withArgName("t1 t2 (timestamp in ms or yyMMdd_hhmmss_SSS)")
+            .withDescription("grep between t1 and t2").create();
+
+    Option byOption =
+        OptionBuilder.hasArgs(1).isRequired(false).withLongOpt(by)
+            .withArgName("t (timestamp in ms or yyMMdd_hhmmss_SSS)").withDescription("grep by t")
+            .create();
+
+    OptionGroup group = new OptionGroup();
+    group.setRequired(true);
+    group.addOption(betweenOption);
+    group.addOption(byOption);
+
+    Options options = new Options();
+    options.addOption(zkCfgOption);
+    options.addOption(patternOption);
+    options.addOptionGroup(group);
+    return options;
+  }
+
+  /**
+   * get zk transaction log dir and zk snapshot log dir
+   * @param zkCfgFile
+   * @return String[0]: zk-transaction-log-dir, String[1]: zk-snapshot-dir
+   */
+  static String[] getZkDataDirs(String zkCfgFile) {
+    String[] zkDirs = new String[2];
+
+    FileInputStream fis = null;
+    BufferedReader br = null;
+    try {
+      fis = new FileInputStream(zkCfgFile);
+      br = new BufferedReader(new InputStreamReader(fis));
+
+      String line;
+      while ((line = br.readLine()) != null) {
+        String key = "dataDir=";
+        if (line.startsWith(key)) {
+          zkDirs[1] = zkDirs[0] = line.substring(key.length()) + "/version-2";
+        }
+
+        key = "dataLogDir=";
+        if (line.startsWith(key)) {
+          zkDirs[0] = line.substring(key.length()) + "/version-2";
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("exception in read file: " + zkCfgFile, e);
+    } finally {
+      try {
+        if (br != null) {
+          br.close();
+        }
+
+        if (fis != null) {
+          fis.close();
+        }
+
+      } catch (Exception e) {
+        LOG.error("exception in closing file: " + zkCfgFile, e);
+      }
+    }
+
+    return zkDirs;
+  }
+
+  // debug
+  static void printFiles(File[] files) {
+    System.out.println("START print");
+    for (int i = 0; i < files.length; i++) {
+      File file = files[i];
+      System.out.println(file.getName() + ", " + file.lastModified());
+    }
+    System.out.println("END print");
+  }
+
+  /**
+   * get files under dir in order of last modified time
+   * @param dir
+   * @param pattern
+   * @return
+   */
+  static File[] getSortedFiles(String dirPath, final String pattern) {
+    File dir = new File(dirPath);
+    File[] files = dir.listFiles(new FileFilter() {
+
+      @Override
+      public boolean accept(File file) {
+        return file.isFile() && (file.getName().indexOf(pattern) != -1);
+      }
+    });
+
+    Arrays.sort(files, new Comparator<File>() {
+
+      @Override
+      public int compare(File o1, File o2) {
+        int sign = (int) Math.signum(o1.lastModified() - o2.lastModified());
+        return sign;
+      }
+
+    });
+    return files;
+  }
+
+  /**
+   * get value for an attribute in a parsed zk log; e.g.
+   * "time:1384984016778 session:0x14257d1d17e0004 cxid:0x5 zxid:0x46899 type:error err:-101"
+   * given "time" return "1384984016778"
+   * @param line
+   * @param attribute
+   * @return value
+   */
+  static String getAttributeValue(String line, String attribute) {
+    if (line == null) {
+      return null;
+    }
+
+    if (!attribute.endsWith(":")) {
+      attribute = attribute + ":";
+    }
+
+    String[] parts = line.split("\\s");
+    if (parts != null && parts.length > 0) {
+      for (int i = 0; i < parts.length; i++) {
+        if (parts[i].startsWith(attribute)) {
+          String val = parts[i].substring(attribute.length());
+          return val;
+        }
+      }
+    }
+    return null;
+  }
+
+  static long getTimestamp(String line) {
+    String timestamp = getAttributeValue(line, "time");
+    return Long.parseLong(timestamp);
+  }
+
+  /**
+   * parse a time string either in timestamp form or "yyMMdd_hhmmss_SSS" form
+   * @param time
+   * @return timestamp or -1 on error
+   */
+  static long parseTimeString(String time) {
+    try {
+      return Long.parseLong(time);
+    } catch (NumberFormatException e) {
+      try {
+        SimpleDateFormat formatter = new SimpleDateFormat("yyMMdd_hhmmss_SSS");
+        Date date = formatter.parse(time);
+        return date.getTime();
+      } catch (java.text.ParseException ex) {
+        LOG.error("fail to parse time string: " + time, e);
+      }
+    }
+    return -1;
+  }
+
+  public static void grepZkLog(File zkLog, long start, long end, String... patterns) {
+    FileInputStream fis = null;
+    BufferedReader br = null;
+    try {
+      fis = new FileInputStream(zkLog);
+      br = new BufferedReader(new InputStreamReader(fis));
+
+      String line;
+      while ((line = br.readLine()) != null) {
+        try {
+          long timestamp = getTimestamp(line);
+          if (timestamp > end) {
+            break;
+          }
+
+          if (timestamp < start) {
+            continue;
+          }
+
+          boolean match = true;
+          for (String pattern : patterns) {
+            if (line.indexOf(pattern) == -1) {
+              match = false;
+              break;
+            }
+          }
+
+          if (match) {
+            System.out.println(line);
+          }
+
+        } catch (NumberFormatException e) {
+          // ignore
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("exception in grep zk-log: " + zkLog, e);
+    } finally {
+      try {
+        if (br != null) {
+          br.close();
+        }
+
+        if (fis != null) {
+          fis.close();
+        }
+
+      } catch (Exception e) {
+        LOG.error("exception in closing zk-log: " + zkLog, e);
+      }
+    }
+  }
+
+  public static void grepZkLogDir(List<File> parsedZkLogs, long start, long end, String... patterns) {
+    for (File file : parsedZkLogs) {
+      grepZkLog(file, start, end, patterns);
+
+    }
+
+  }
+
+  public static void grepZkSnapshot(File zkSnapshot, String... patterns) {
+    FileInputStream fis = null;
+    BufferedReader br = null;
+    try {
+      fis = new FileInputStream(zkSnapshot);
+      br = new BufferedReader(new InputStreamReader(fis));
+
+      String line;
+      while ((line = br.readLine()) != null) {
+        try {
+          boolean match = true;
+          for (String pattern : patterns) {
+            if (line.indexOf(pattern) == -1) {
+              match = false;
+              break;
+            }
+          }
+
+          if (match) {
+            System.out.println(line);
+          }
+
+        } catch (NumberFormatException e) {
+          // ignore
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("exception in grep zk-snapshot: " + zkSnapshot, e);
+    } finally {
+      try {
+        if (br != null) {
+          br.close();
+        }
+
+        if (fis != null) {
+          fis.close();
+        }
+
+      } catch (Exception e) {
+        LOG.error("exception in closing zk-snapshot: " + zkSnapshot, e);
+      }
+    }
+  }
+
+  /**
+   * guess zoo.cfg dir
+   * @return absolute path to zoo.cfg
+   */
+  static String guessZkCfgDir() {
+    // TODO impl this
+    return null;
+  }
+
+  public static void printUsage(Options cliOptions) {
+    HelpFormatter helpFormatter = new HelpFormatter();
+    helpFormatter.setWidth(1000);
+    helpFormatter.printHelp("java " + ZkGrep.class.getName(), cliOptions);
+  }
+
+  /**
+   * parse zk-transaction-logs between start and end, if not already parsed
+   * @param zkLogDir
+   * @param start
+   * @param end
+   * @return list of parsed zklogs between start and end, in order of last modified timestamp
+   */
+  static List<File> parseZkLogs(String zkLogDir, long start, long end) {
+    File zkParsedDir = new File(String.format("%s/zklog-parsed", System.getProperty("user.home")));
+    File[] zkLogs = getSortedFiles(zkLogDir, log);
+    // printFiles(zkDataFiles);
+    List<File> parsedZkLogs = new ArrayList<File>();
+
+    boolean stop = false;
+    for (File zkLog : zkLogs) {
+      if (stop) {
+        break;
+      }
+
+      if (zkLog.lastModified() < start) {
+        continue;
+      }
+
+      if (zkLog.lastModified() > end) {
+        stop = true;
+      }
+
+      try {
+        File parsedZkLog = new File(zkParsedDir, stripGzSuffix(zkLog.getName()) + ".parsed");
+        if (!parsedZkLog.exists() || parsedZkLog.lastModified() <= zkLog.lastModified()) {
+
+          if (zkLog.getName().endsWith(gzSuffix)) {
+            // copy and gunzip it
+            FileUtils.copyFileToDirectory(zkLog, zkParsedDir);
+            File zkLogGz = new File(zkParsedDir, zkLog.getName());
+            File tmpZkLog = gunzip(zkLogGz);
+
+            // parse gunzip file
+            ZKLogFormatter.main(new String[] {
+                log, tmpZkLog.getAbsolutePath(), parsedZkLog.getAbsolutePath()
+            });
+
+            // delete it
+            zkLogGz.delete();
+            tmpZkLog.delete();
+          } else {
+            // parse it directly
+            ZKLogFormatter.main(new String[] {
+                log, zkLog.getAbsolutePath(), parsedZkLog.getAbsolutePath()
+            });
+          }
+        }
+        parsedZkLogs.add(parsedZkLog);
+      } catch (Exception e) {
+        LOG.error("fail to parse zkLog: " + zkLog, e);
+      }
+    }
+
+    return parsedZkLogs;
+  }
+
+  /**
+   * Strip off a .gz suffix if any
+   * @param filename
+   * @return
+   */
+  static String stripGzSuffix(String filename) {
+    if (filename.endsWith(gzSuffix)) {
+      return filename.substring(0, filename.length() - gzSuffix.length());
+    }
+    return filename;
+  }
+
+  /**
+   * Gunzip a file
+   * @param zipFile
+   * @return
+   */
+  static File gunzip(File zipFile) {
+    File outputFile = new File(stripGzSuffix(zipFile.getAbsolutePath()));
+
+    byte[] buffer = new byte[1024];
+
+    try {
+
+      GZIPInputStream gzis = new GZIPInputStream(new FileInputStream(zipFile));
+      FileOutputStream out = new FileOutputStream(outputFile);
+
+      int len;
+      while ((len = gzis.read(buffer)) > 0) {
+        out.write(buffer, 0, len);
+      }
+
+      gzis.close();
+      out.close();
+
+      return outputFile;
+    } catch (IOException e) {
+      LOG.error("fail to gunzip file: " + zipFile, e);
+    }
+
+    return null;
+  }
+
+  /**
+   * parse the last zk-snapshots by by-time, if not already parsed
+   * @param zkSnapshotDir
+   * @param byTime
+   * @return File array which the first element is the last zk-snapshot by by-time and the second
+   *         element is its parsed file
+   */
+  static File[] parseZkSnapshot(String zkSnapshotDir, long byTime) {
+    File[] retFiles = new File[2];
+    File zkParsedDir = new File(String.format("%s/zklog-parsed", System.getProperty("user.home")));
+    File[] zkSnapshots = getSortedFiles(zkSnapshotDir, snapshot);
+    // printFiles(zkDataFiles);
+    File lastZkSnapshot = null;
+    for (int i = 0; i < zkSnapshots.length; i++) {
+      File zkSnapshot = zkSnapshots[i];
+      if (zkSnapshot.lastModified() >= byTime) {
+        break;
+      }
+      lastZkSnapshot = zkSnapshot;
+      retFiles[0] = lastZkSnapshot;
+    }
+
+    try {
+      File parsedZkSnapshot =
+          new File(zkParsedDir, stripGzSuffix(lastZkSnapshot.getName()) + ".parsed");
+      if (!parsedZkSnapshot.exists()
+          || parsedZkSnapshot.lastModified() <= lastZkSnapshot.lastModified()) {
+
+        if (lastZkSnapshot.getName().endsWith(gzSuffix)) {
+          // copy and gunzip it
+          FileUtils.copyFileToDirectory(lastZkSnapshot, zkParsedDir);
+          File lastZkSnapshotGz = new File(zkParsedDir, lastZkSnapshot.getName());
+          File tmpLastZkSnapshot = gunzip(lastZkSnapshotGz);
+
+          // parse gunzip file
+          ZKLogFormatter.main(new String[] {
+              snapshot, tmpLastZkSnapshot.getAbsolutePath(), parsedZkSnapshot.getAbsolutePath()
+          });
+
+          // delete it
+          lastZkSnapshotGz.delete();
+          tmpLastZkSnapshot.delete();
+        } else {
+          // parse it directly
+          ZKLogFormatter.main(new String[] {
+              snapshot, lastZkSnapshot.getAbsolutePath(), parsedZkSnapshot.getAbsolutePath()
+          });
+        }
+
+      }
+      retFiles[1] = parsedZkSnapshot;
+      return retFiles;
+    } catch (Exception e) {
+      LOG.error("fail to parse zkSnapshot: " + lastZkSnapshot, e);
+    }
+
+    return null;
+  }
+
+  public static void processCommandLineArgs(String[] cliArgs) {
+    CommandLineParser cliParser = new GnuParser();
+    Options cliOptions = constructCommandLineOptions();
+    CommandLine cmd = null;
+
+    try {
+      cmd = cliParser.parse(cliOptions, cliArgs);
+    } catch (ParseException pe) {
+      System.err.println("CommandLineClient: failed to parse command-line options: " + pe);
+      printUsage(cliOptions);
+      System.exit(1);
+    }
+
+    String zkCfgDirValue = null;
+    String zkCfgFile = null;
+
+    if (cmd.hasOption(zkCfg)) {
+      zkCfgDirValue = cmd.getOptionValue(zkCfg);
+    }
+
+    if (zkCfgDirValue == null) {
+      zkCfgDirValue = guessZkCfgDir();
+    }
+
+    if (zkCfgDirValue == null) {
+      LOG.error("couldn't figure out path to zkCfg file");
+      System.exit(1);
+    }
+
+    // get zoo.cfg path from cfg-dir
+    zkCfgFile = zkCfgDirValue;
+    if (!zkCfgFile.endsWith(".cfg")) {
+      // append with default zoo.cfg
+      zkCfgFile = zkCfgFile + "/zoo.cfg";
+    }
+
+    if (!new File(zkCfgFile).exists()) {
+      LOG.error("zoo.cfg file doen't exist: " + zkCfgFile);
+      System.exit(1);
+    }
+
+    String[] patterns = cmd.getOptionValues(pattern);
+
+    String[] zkDataDirs = getZkDataDirs(zkCfgFile);
+
+    // parse zk data files
+    if (zkDataDirs == null || zkDataDirs[0] == null || zkDataDirs[1] == null) {
+      LOG.error("invalid zkCfgDir: " + zkCfgDirValue);
+      System.exit(1);
+    }
+
+    File zkParsedDir = new File(String.format("%s/zklog-parsed", System.getProperty("user.home")));
+    if (!zkParsedDir.exists()) {
+      LOG.info("creating zklog-parsed dir: " + zkParsedDir.getAbsolutePath());
+      zkParsedDir.mkdir();
+    }
+
+    if (cmd.hasOption(between)) {
+      String[] timeStrings = cmd.getOptionValues(between);
+
+      long startTime = parseTimeString(timeStrings[0]);
+      if (startTime == -1) {
+        LOG.error("invalid start time string: " + timeStrings[0]
+            + ", should be either timestamp or yyMMdd_hhmmss_SSS");
+        System.exit(1);
+      }
+
+      long endTime = parseTimeString(timeStrings[1]);
+      if (endTime == -1) {
+        LOG.error("invalid end time string: " + timeStrings[1]
+            + ", should be either timestamp or yyMMdd_hhmmss_SSS");
+        System.exit(1);
+      }
+
+      if (startTime > endTime) {
+        LOG.warn("empty window: " + startTime + " - " + endTime);
+        System.exit(1);
+      }
+      // zkDataDirs[0] is the transaction log dir
+      List<File> parsedZkLogs = parseZkLogs(zkDataDirs[0], startTime, endTime);
+      grepZkLogDir(parsedZkLogs, startTime, endTime, patterns);
+
+    } else if (cmd.hasOption(by)) {
+      String timeString = cmd.getOptionValue(by);
+
+      long byTime = parseTimeString(timeString);
+      if (byTime == -1) {
+        LOG.error("invalid by time string: " + timeString
+            + ", should be either timestamp or yyMMdd_hhmmss_SSS");
+        System.exit(1);
+      }
+
+      // zkDataDirs[1] is the snapshot dir
+      File[] lastZkSnapshot = parseZkSnapshot(zkDataDirs[1], byTime);
+
+      // lastZkSnapshot[1] is the parsed last snapshot by byTime
+      grepZkSnapshot(lastZkSnapshot[1], patterns);
+
+      // need to grep transaction logs between last-modified-time of snapshot and byTime also
+      // lastZkSnapshot[0] is the last snapshot by byTime
+      long startTime = lastZkSnapshot[0].lastModified();
+
+      // zkDataDirs[0] is the transaction log dir
+      List<File> parsedZkLogs = parseZkLogs(zkDataDirs[0], startTime, byTime);
+      grepZkLogDir(parsedZkLogs, startTime, byTime, patterns);
+    }
+  }
+
+  public static void main(String[] args) {
+    processCommandLineArgs(args);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/027eea29/helix-core/src/main/java/org/apache/helix/tools/ZkLogAnalyzer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ZkLogAnalyzer.java b/helix-core/src/main/java/org/apache/helix/tools/ZkLogAnalyzer.java
deleted file mode 100644
index 8b32ddc..0000000
--- a/helix-core/src/main/java/org/apache/helix/tools/ZkLogAnalyzer.java
+++ /dev/null
@@ -1,444 +0,0 @@
-package org.apache.helix.tools;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileFilter;
-import java.io.FileInputStream;
-import java.io.InputStreamReader;
-import java.sql.Timestamp;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.helix.ZNRecord;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.Message.MessageState;
-import org.apache.log4j.Logger;
-
-public class ZkLogAnalyzer {
-  private static Logger LOG = Logger.getLogger(ZkLogAnalyzer.class);
-  private static boolean dump = false;;
-  final static ZNRecordSerializer _deserializer = new ZNRecordSerializer();
-
-  static class Stats {
-    int msgSentCount = 0;
-    int msgSentCount_O2S = 0; // Offline to Slave
-    int msgSentCount_S2M = 0; // Slave to Master
-    int msgSentCount_M2S = 0; // Master to Slave
-    int msgDeleteCount = 0;
-    int msgModifyCount = 0;
-    int curStateCreateCount = 0;
-    int curStateUpdateCount = 0;
-    int extViewCreateCount = 0;
-    int extViewUpdateCount = 0;
-  }
-
-  static String getAttributeValue(String line, String attribute) {
-    if (line == null)
-      return null;
-    String[] parts = line.split("\\s");
-    if (parts != null && parts.length > 0) {
-      for (int i = 0; i < parts.length; i++) {
-        if (parts[i].startsWith(attribute)) {
-          String val = parts[i].substring(attribute.length());
-          return val;
-        }
-      }
-    }
-    return null;
-  }
-
-  static String findLastCSUpdateBetween(List<String> csUpdateLines, long start, long end) {
-    long lastCSUpdateTimestamp = Long.MIN_VALUE;
-    String lastCSUpdateLine = null;
-    for (String line : csUpdateLines) {
-      // ZNRecord record = getZNRecord(line);
-      long timestamp = Long.parseLong(getAttributeValue(line, "time:"));
-      if (timestamp >= start && timestamp <= end && timestamp > lastCSUpdateTimestamp) {
-        lastCSUpdateTimestamp = timestamp;
-        lastCSUpdateLine = line;
-      }
-    }
-    assert (lastCSUpdateLine != null) : "No CS update between " + start + " - " + end;
-    return lastCSUpdateLine;
-  }
-
-  static ZNRecord getZNRecord(String line) {
-    ZNRecord record = null;
-    String value = getAttributeValue(line, "data:");
-    if (value != null) {
-      record = (ZNRecord) _deserializer.deserialize(value.getBytes());
-      // if (record == null)
-      // {
-      // System.out.println(line);
-      // }
-    }
-    return record;
-  }
-
-  public static void main(String[] args) throws Exception {
-    if (args.length != 3) {
-      System.err
-          .println("USAGE: ZkLogAnalyzer zkLogDir clusterName testStartTime (yyMMdd_hhmmss_SSS)");
-      System.exit(1);
-    }
-
-    System.out.println("ZkLogAnalyzer called with args: " + Arrays.toString(args));
-    // get create-timestamp of "/" + clusterName
-    // find all zk logs after that create-timestamp and parse them
-    // save parsed log in /tmp/zkLogAnalyzor_zklog.parsed0,1,2...
-
-    String zkLogDir = args[0];
-    String clusterName = args[1];
-    // String zkAddr = args[2];
-    String startTimeStr = args[2];
-    // ZkClient zkClient = new ZkClient(zkAddr);
-    // Stat clusterCreateStat = zkClient.getStat("/" + clusterName);
-    SimpleDateFormat formatter = new SimpleDateFormat("yyMMdd_hhmmss_SSS");
-    Date date = formatter.parse(startTimeStr);
-    long startTimeStamp = date.getTime();
-
-    System.out.println(clusterName + " created at " + date);
-    while (zkLogDir.endsWith("/")) {
-      zkLogDir = zkLogDir.substring(0, zkLogDir.length() - 1);
-    }
-    if (!zkLogDir.endsWith("/version-2")) {
-      zkLogDir = zkLogDir + "/version-2";
-    }
-    File dir = new File(zkLogDir);
-    File[] zkLogs = dir.listFiles(new FileFilter() {
-
-      @Override
-      public boolean accept(File file) {
-        return file.isFile() && (file.getName().indexOf("log") != -1);
-      }
-    });
-
-    // lastModified time -> zkLog
-    TreeMap<Long, String> lastZkLogs = new TreeMap<Long, String>();
-    for (File file : zkLogs) {
-      if (file.lastModified() > startTimeStamp) {
-        lastZkLogs.put(file.lastModified(), file.getAbsolutePath());
-      }
-    }
-
-    List<String> parsedZkLogs = new ArrayList<String>();
-    int i = 0;
-    System.out.println("zk logs last modified later than " + new Timestamp(startTimeStamp));
-    for (Long lastModified : lastZkLogs.keySet()) {
-      String fileName = lastZkLogs.get(lastModified);
-      System.out.println(new Timestamp(lastModified) + ": "
-          + (fileName.substring(fileName.lastIndexOf('/') + 1)));
-
-      String parsedFileName = "zkLogAnalyzor_zklog.parsed" + i;
-      i++;
-      ZKLogFormatter.main(new String[] {
-          "log", fileName, parsedFileName
-      });
-      parsedZkLogs.add(parsedFileName);
-    }
-
-    // sessionId -> create liveInstance line
-    Map<String, String> sessionMap = new HashMap<String, String>();
-
-    // message send lines in time order
-    // List<String> sendMessageLines = new ArrayList<String>();
-
-    // CS update lines in time order
-    List<String> csUpdateLines = new ArrayList<String>();
-
-    String leaderSession = null;
-
-    System.out.println();
-    Stats stats = new Stats();
-    long lastTestStartTimestamp = Long.MAX_VALUE;
-    long controllerStartTime = 0;
-    for (String parsedZkLog : parsedZkLogs) {
-
-      FileInputStream fis = new FileInputStream(parsedZkLog);
-      BufferedReader br = new BufferedReader(new InputStreamReader(fis));
-
-      String inputLine;
-      while ((inputLine = br.readLine()) != null) {
-        String timestamp = getAttributeValue(inputLine, "time:");
-        if (timestamp == null) {
-          continue;
-        }
-        long timestampVal = Long.parseLong(timestamp);
-        if (timestampVal < startTimeStamp) {
-          continue;
-        }
-
-        if (dump == true) {
-          String printLine = inputLine.replaceAll("data:.*", "");
-          printLine =
-              new Timestamp(timestampVal) + " "
-                  + printLine.substring(printLine.indexOf("session:"));
-          System.err.println(printLine);
-        }
-
-        if (inputLine.indexOf("/start_disable") != -1) {
-          dump = true;
-        }
-        if (inputLine.indexOf("/" + clusterName + "/CONFIGS/CLUSTER/verify") != -1) {
-          String type = getAttributeValue(inputLine, "type:");
-          if (type.equals("delete")) {
-            System.out.println(timestamp + ": verify done");
-            System.out.println("lastTestStartTimestamp:" + lastTestStartTimestamp);
-            String lastCSUpdateLine =
-                findLastCSUpdateBetween(csUpdateLines, lastTestStartTimestamp, timestampVal);
-            long lastCSUpdateTimestamp =
-                Long.parseLong(getAttributeValue(lastCSUpdateLine, "time:"));
-            System.out.println("Last CS Update:" + lastCSUpdateTimestamp);
-
-            System.out.println("state transition latency: "
-                + +(lastCSUpdateTimestamp - lastTestStartTimestamp) + "ms");
-
-            System.out.println("state transition latency since controller start: "
-                + +(lastCSUpdateTimestamp - controllerStartTime) + "ms");
-
-            System.out.println("Create MSG\t" + stats.msgSentCount + "\t" + stats.msgSentCount_O2S
-                + "\t" + stats.msgSentCount_S2M + "\t" + stats.msgSentCount_M2S);
-            System.out.println("Modify MSG\t" + stats.msgModifyCount);
-            System.out.println("Delete MSG\t" + stats.msgDeleteCount);
-            System.out.println("Create CS\t" + stats.curStateCreateCount);
-            System.out.println("Update CS\t" + stats.curStateUpdateCount);
-            System.out.println("Create EV\t" + stats.extViewCreateCount);
-            System.out.println("Update EV\t" + stats.extViewUpdateCount);
-
-            System.out.println();
-            stats = new Stats();
-            lastTestStartTimestamp = Long.MAX_VALUE;
-          }
-        } else if (inputLine.indexOf("/" + clusterName + "/LIVEINSTANCES/") != -1) {
-          // cluster startup
-          if (timestampVal < lastTestStartTimestamp) {
-            System.out.println("START cluster. SETTING lastTestStartTimestamp to "
-                + new Timestamp(timestampVal) + "\nline:" + inputLine);
-            lastTestStartTimestamp = timestampVal;
-          }
-
-          ZNRecord record = getZNRecord(inputLine);
-          LiveInstance liveInstance = new LiveInstance(record);
-          String session = getAttributeValue(inputLine, "session:");
-          sessionMap.put(session, inputLine);
-          System.out.println(new Timestamp(Long.parseLong(timestamp)) + ": create LIVEINSTANCE "
-              + liveInstance.getInstanceName());
-        } else if (inputLine.indexOf("closeSession") != -1) {
-          // kill any instance
-          String session = getAttributeValue(inputLine, "session:");
-          if (sessionMap.containsKey(session)) {
-            if (timestampVal < lastTestStartTimestamp) {
-              System.out.println("KILL node. SETTING lastTestStartTimestamp to " + timestampVal
-                  + " line:" + inputLine);
-              lastTestStartTimestamp = timestampVal;
-            }
-            String line = sessionMap.get(session);
-            ZNRecord record = getZNRecord(line);
-            LiveInstance liveInstance = new LiveInstance(record);
-
-            System.out.println(new Timestamp(Long.parseLong(timestamp)) + ": close session "
-                + liveInstance.getInstanceName());
-            dump = true;
-          }
-        } else if (inputLine.indexOf("/" + clusterName + "/CONFIGS/PARTICIPANT") != -1) {
-          // disable a partition
-          String type = getAttributeValue(inputLine, "type:");
-          if (type.equals("setData") && inputLine.indexOf("HELIX_DISABLED_PARTITION") != -1) {
-            if (timestampVal < lastTestStartTimestamp) {
-              System.out.println("DISABLE partition. SETTING lastTestStartTimestamp to "
-                  + timestampVal + " line:" + inputLine);
-              lastTestStartTimestamp = timestampVal;
-            }
-          }
-        } else if (inputLine.indexOf("/" + clusterName + "/CONTROLLER/LEADER") != -1) {
-          // leaderLine = inputLine;
-          ZNRecord record = getZNRecord(inputLine);
-          LiveInstance liveInstance = new LiveInstance(record);
-          String session = getAttributeValue(inputLine, "session:");
-          leaderSession = session;
-          controllerStartTime = Long.parseLong(getAttributeValue(inputLine, "time:"));
-          sessionMap.put(session, inputLine);
-          System.out.println(new Timestamp(Long.parseLong(timestamp)) + ": create LEADER "
-              + liveInstance.getInstanceName());
-        } else if (inputLine.indexOf("/" + clusterName + "/") != -1
-            && inputLine.indexOf("/CURRENTSTATES/") != -1) {
-          String type = getAttributeValue(inputLine, "type:");
-          if (type.equals("create")) {
-            stats.curStateCreateCount++;
-          } else if (type.equals("setData")) {
-            String path = getAttributeValue(inputLine, "path:");
-            csUpdateLines.add(inputLine);
-            stats.curStateUpdateCount++;
-            // getAttributeValue(line, "data");
-            System.out.println("Update currentstate:" + new Timestamp(Long.parseLong(timestamp))
-                + ":" + timestamp + " path:" + path);
-          }
-        } else if (inputLine.indexOf("/" + clusterName + "/EXTERNALVIEW/") != -1) {
-          String session = getAttributeValue(inputLine, "session:");
-          if (session.equals(leaderSession)) {
-            String type = getAttributeValue(inputLine, "type:");
-            if (type.equals("create")) {
-              stats.extViewCreateCount++;
-            } else if (type.equals("setData")) {
-              stats.extViewUpdateCount++;
-            }
-          }
-
-          // pos = inputLine.indexOf("EXTERNALVIEW");
-          // pos = inputLine.indexOf("data:{", pos);
-          // if (pos != -1)
-          // {
-          // String timestamp = getAttributeValue(inputLine, "time:");
-          // ZNRecord record =
-          // (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
-          // .getBytes());
-          // ExternalView extView = new ExternalView(record);
-          // int masterCnt = ClusterStateVerifier.countStateNbInExtView(extView,
-          // "MASTER");
-          // int slaveCnt = ClusterStateVerifier.countStateNbInExtView(extView, "SLAVE");
-          // if (masterCnt == 1200)
-          // {
-          // System.out.println(timestamp + ": externalView " + extView.getResourceName()
-          // + " has " + masterCnt + " MASTER, " + slaveCnt + " SLAVE");
-          // }
-          // }
-        } else if (inputLine.indexOf("/" + clusterName + "/") != -1
-            && inputLine.indexOf("/MESSAGES/") != -1) {
-          String type = getAttributeValue(inputLine, "type:");
-
-          if (type.equals("create")) {
-            ZNRecord record = getZNRecord(inputLine);
-            Message msg = new Message(record);
-            String sendSession = getAttributeValue(inputLine, "session:");
-            if (sendSession.equals(leaderSession) && msg.getMsgType().equals("STATE_TRANSITION")
-                && msg.getMsgState() == MessageState.NEW) {
-              // sendMessageLines.add(inputLine);
-              stats.msgSentCount++;
-
-              if (msg.getTypedFromState().toString().equals("OFFLINE")
-                  && msg.getTypedToState().toString().equals("SLAVE")) {
-                stats.msgSentCount_O2S++;
-              } else if (msg.getTypedFromState().toString().equals("SLAVE")
-                  && msg.getTypedToState().toString().equals("MASTER")) {
-                stats.msgSentCount_S2M++;
-              } else if (msg.getTypedFromState().toString().equals("MASTER")
-                  && msg.getTypedToState().toString().equals("SLAVE")) {
-                stats.msgSentCount_M2S++;
-              }
-              // System.out.println("Message create:"+new
-              // Timestamp(Long.parseLong(timestamp)));
-            }
-
-            // pos = inputLine.indexOf("MESSAGES");
-            // pos = inputLine.indexOf("data:{", pos);
-            // if (pos != -1)
-            // {
-            //
-            // byte[] msgBytes = inputLine.substring(pos + 5).getBytes();
-            // ZNRecord record = (ZNRecord) _deserializer.deserialize(msgBytes);
-            // Message msg = new Message(record);
-            // MessageState msgState = msg.getMsgState();
-            // String msgType = msg.getMsgType();
-            // if (msgType.equals("STATE_TRANSITION") && msgState == MessageState.NEW)
-            // {
-            // if (!msgs.containsKey(msg.getMsgId()))
-            // {
-            // msgs.put(msg.getMsgId(), new MsgItem(Long.parseLong(timestamp), msg));
-            // }
-            // else
-            // {
-            // LOG.error("msg: " + msg.getMsgId() + " already sent");
-            // }
-            //
-            // System.out.println(timestamp + ": sendMsg " + msg.getPartitionName() + "("
-            // + msg.getFromState() + "->" + msg.getToState() + ") to "
-            // + msg.getTgtName() + ", size: " + msgBytes.length);
-            // }
-            // }
-          } else if (type.equals("setData")) {
-            stats.msgModifyCount++;
-            // pos = inputLine.indexOf("MESSAGES");
-            // pos = inputLine.indexOf("data:{", pos);
-            // if (pos != -1)
-            // {
-            //
-            // byte[] msgBytes = inputLine.substring(pos + 5).getBytes();
-            // ZNRecord record = (ZNRecord) _deserializer.deserialize(msgBytes);
-            // Message msg = new Message(record);
-            // MessageState msgState = msg.getMsgState();
-            // String msgType = msg.getMsgType();
-            // if (msgType.equals("STATE_TRANSITION") && msgState == MessageState.READ)
-            // {
-            // if (!msgs.containsKey(msg.getMsgId()))
-            // {
-            // LOG.error("msg: " + msg.getMsgId() + " never sent");
-            // }
-            // else
-            // {
-            // MsgItem msgItem = msgs.get(msg.getMsgId());
-            // if (msgItem.readTime == 0)
-            // {
-            // msgItem.readTime = Long.parseLong(timestamp);
-            // msgs.put(msg.getMsgId(), msgItem);
-            // // System.out.println(timestamp + ": readMsg " + msg.getPartitionName()
-            // // + "("
-            // // + msg.getFromState() + "->" + msg.getToState() + ") to "
-            // // + msg.getTgtName() + ", latency: " + (msgItem.readTime -
-            // // msgItem.sendTime));
-            // }
-            // }
-            //
-            // }
-            // }
-          } else if (type.equals("delete")) {
-            stats.msgDeleteCount++;
-            // String msgId = path.substring(path.lastIndexOf('/') + 1);
-            // if (msgs.containsKey(msgId))
-            // {
-            // MsgItem msgItem = msgs.get(msgId);
-            // Message msg = msgItem.msg;
-            // msgItem.deleteTime = Long.parseLong(timestamp);
-            // msgs.put(msgId, msgItem);
-            // msgItem.latency = msgItem.deleteTime - msgItem.sendTime;
-            // System.out.println(timestamp + ": delMsg " + msg.getPartitionName() + "("
-            // + msg.getFromState() + "->" + msg.getToState() + ") to "
-            // + msg.getTgtName() + ", latency: " + msgItem.latency);
-            // }
-            // else
-            // {
-            // // messages other than STATE_TRANSITION message
-            // // LOG.error("msg: " + msgId + " never sent");
-            // }
-          }
-        }
-      } // end of [br.readLine()) != null]
-    }
-  }
-}