You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by lu...@apache.org on 2015/09/06 09:39:13 UTC

[07/50] [abbrv] incubator-kylin git commit: minor, ApiRequestParser compile with Java 6

minor, ApiRequestParser compile with Java 6


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/3b275caf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/3b275caf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/3b275caf

Branch: refs/heads/master
Commit: 3b275caf880037e1ae4b008ad7c3a2a48ed3abe2
Parents: 81e1be4
Author: Li, Yang <ya...@ebay.com>
Authored: Tue Aug 11 17:46:02 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Tue Aug 11 17:46:02 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/monitor/ApiRequestParser.java  | 399 ++++++++++---------
 1 file changed, 206 insertions(+), 193 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3b275caf/monitor/src/main/java/org/apache/kylin/monitor/ApiRequestParser.java
----------------------------------------------------------------------
diff --git a/monitor/src/main/java/org/apache/kylin/monitor/ApiRequestParser.java b/monitor/src/main/java/org/apache/kylin/monitor/ApiRequestParser.java
index 1125d07..b3a1f40 100644
--- a/monitor/src/main/java/org/apache/kylin/monitor/ApiRequestParser.java
+++ b/monitor/src/main/java/org/apache/kylin/monitor/ApiRequestParser.java
@@ -14,7 +14,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.monitor;
 
@@ -48,197 +48,210 @@ import au.com.bytecode.opencsv.CSVWriter;
  */
 public class ApiRequestParser {
 
-    final static Logger logger = Logger.getLogger(ApiRequestParser.class);
-    final static Charset ENCODING = StandardCharsets.UTF_8;
-    static String REQUEST_PARSE_RESULT_PATH = null;
-    final static String REQUEST_LOG_FILE_PATTERN = "kylin_request.log.(\\d{4}-\\d{2}-\\d{2})$";
-    final static String REQUEST_LOG_PARSE_RESULT_FILENAME = "kylin_request_log.csv";
-    static String DEPLOY_ENV;
-
-    final static String[] KYLIN_REQUEST_CSV_HEADER = { "REQUESTER", "REQ_TIME", "REQ_DATE", "URI", "METHOD", "QUERY_STRING", "PAYLOAD", "RESP_STATUS", "TARGET", "ACTION", "DEPLOY_ENV" };
-
-    private ConfigUtils monitorConfig;
-
-    public ApiRequestParser() {
-        monitorConfig = ConfigUtils.getInstance();
-        try {
-            monitorConfig.loadMonitorParam();
-            DEPLOY_ENV = monitorConfig.getDeployEnv();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-    }
-
-    public void start() throws IOException, ParseException {
-        ApiRequestParser.REQUEST_PARSE_RESULT_PATH = ConfigUtils.getInstance().getRequestLogParseResultDir() + REQUEST_LOG_PARSE_RESULT_FILENAME;
-        this.parseRequestInit();
-
-        //get api req log files have been read
-        String[] hasReadFiles = MonitorMetaManager.getReadApiReqLogFileList();
-
-        List<File> files = this.getRequestLogFiles();
-        for (File file : files) {
-            if (!Arrays.asList(hasReadFiles).contains(file.getName())) {
-                this.parseRequestLog(file.getPath(), ApiRequestParser.REQUEST_PARSE_RESULT_PATH);
-                MonitorMetaManager.markApiReqLogFileAsRead(file.getName());
-            }
-        }
-    }
-
-    public void parseRequestInit() throws IOException {
-        logger.info("parse api request initializing...");
-        FileSystem fs = null;
-        try {
-            Configuration conf = new Configuration();
-            fs = FileSystem.get(conf);
-            org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(ApiRequestParser.REQUEST_PARSE_RESULT_PATH);
-            if (!fs.exists(path)) {
-                fs.create(path);
-
-                //need to close before get FileSystem again
-                fs.close();
-                this.writeResultToHdfs(ApiRequestParser.REQUEST_PARSE_RESULT_PATH, ApiRequestParser.KYLIN_REQUEST_CSV_HEADER);
-            }
-        } catch (Exception e) {
-            fs.close();
-            logger.info("Failed to init:", e);
-        }
-    }
-
-    //parse query log and convert to csv file to hdfs
-    public void parseRequestLog(String filePath, String dPath) throws ParseException, IOException {
-
-        logger.info("Start parsing kylin api request file " + filePath + " !");
-
-        //        writer config init
-        FileSystem fs = this.getHdfsFileSystem();
-        org.apache.hadoop.fs.Path resultStorePath = new org.apache.hadoop.fs.Path(dPath);
-        OutputStreamWriter writer = new OutputStreamWriter(fs.append(resultStorePath));
-        CSVWriter cwriter = new CSVWriter(writer, '|', CSVWriter.NO_QUOTE_CHARACTER);
-
-        Pattern p_available = Pattern.compile("/kylin/api/(cubes|user)+.*");
-        Pattern p_request = Pattern.compile("^.*\\[.*KylinApiFilter.logRequest.*\\].*REQUEST:.*REQUESTER=(.*);REQ_TIME=(\\w+ (\\d{4}-\\d{2}-\\d{2}).*);URI=(.*);METHOD=(.*);QUERY_STRING=(.*);PAYLOAD=(.*);RESP_STATUS=(.*);$");
-        Pattern p_uri = Pattern.compile("/kylin/api/(\\w+)(/.*/)*(.*)$");
-        Matcher m_available = p_available.matcher("");
-        Matcher m_request = p_request.matcher("");
-        Matcher m_uri = p_uri.matcher("");
-
-        Path path = Paths.get(filePath);
-        try {
-            BufferedReader reader = Files.newBufferedReader(path, ENCODING);
-            String line = null;
-            while ((line = reader.readLine()) != null) {
-                //reset the input
-                m_available.reset(line);
-                m_request.reset(line);
-
-                //filter unnecessary info
-                if (m_available.find()) {
-                    //filter GET info
-                    if (m_request.find() && !m_request.group(5).equals("GET")) {
-
-                        List<String> groups = new ArrayList<String>();
-                        for (int i = 1; i <= m_request.groupCount(); i++) {
-                            groups.add(m_request.group(i));
-                        }
-
-                        String uri = m_request.group(4);
-                        m_uri.reset(uri);
-                        if (m_uri.find()) {
-
-                            //add target
-                            groups.add(m_uri.group(1));
-
-                            //add action
-                            if (m_uri.group(1).equals("cubes")) {
-                                switch (m_request.group(5)) {
-                                case "DELETE":
-                                    groups.add("drop");
-                                    break;
-                                case "POST":
-                                    groups.add("save");
-                                    break;
-                                default:
-                                    //add parse action
-                                    groups.add(m_uri.group(3));
-                                    break;
-                                }
-                            }
-
-                        }
-                        groups.add(DEPLOY_ENV);
-                        String[] recordArray = groups.toArray(new String[groups.size()]);
-                        //write to hdfs
-                        cwriter.writeNext(recordArray);
-                    }
-                }
-
-            }
-        } catch (IOException ex) {
-            logger.info("Failed to write to hdfs:", ex);
-        } finally {
-            writer.close();
-            cwriter.close();
-            fs.close();
-        }
-
-        logger.info("Finish parsing file " + filePath + " !");
-    }
-
-    public void writeResultToHdfs(String dPath, String[] record) throws IOException {
-        OutputStreamWriter writer = null;
-        CSVWriter cwriter = null;
-        FileSystem fs = null;
-        try {
-
-            fs = this.getHdfsFileSystem();
-            org.apache.hadoop.fs.Path resultStorePath = new org.apache.hadoop.fs.Path(dPath);
-            writer = new OutputStreamWriter(fs.append(resultStorePath));
-            cwriter = new CSVWriter(writer, '|', CSVWriter.NO_QUOTE_CHARACTER);
-            cwriter.writeNext(record);
-
-        } catch (IOException e) {
-            logger.info("Exception", e);
-        } finally {
-            writer.close();
-            cwriter.close();
-            fs.close();
-        }
-    }
-
-    public List<File> getRequestLogFiles() {
-        List<File> logFiles = new ArrayList<File>();
-
-        //        String request_log_file_pattern = monitorConfig.getRequestLogFilePattern();
-
-        List<String> request_log_dir_list = monitorConfig.getLogBaseDir();
-        FileFilter filter = new RegexFileFilter(REQUEST_LOG_FILE_PATTERN);
-
-        for (String path : request_log_dir_list) {
-            logger.info("fetch api request log file from path:" + path);
-            File request_log_dir = new File(path);
-            File[] request_log_files = request_log_dir.listFiles(filter);
-            if (request_log_files == null) {
-                logger.warn("no api request log file found under path" + path);
-                break;
-            }
-            Collections.addAll(logFiles, request_log_files);
-        }
-
-        return logFiles;
-    }
-
-    public FileSystem getHdfsFileSystem() throws IOException {
-        Configuration conf = new Configuration();
-        //        conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
-        FileSystem fs = null;
-        try {
-            fs = FileSystem.get(conf);
-        } catch (IOException e) {
-            fs.close();
-            logger.info("Failed to get hdfs FileSystem", e);
-        }
-        return fs;
-    }
+	final static Logger logger = Logger.getLogger(ApiRequestParser.class);
+	final static Charset ENCODING = StandardCharsets.UTF_8;
+	static String REQUEST_PARSE_RESULT_PATH = null;
+	final static String REQUEST_LOG_FILE_PATTERN = "kylin_request.log.(\\d{4}-\\d{2}-\\d{2})$";
+	final static String REQUEST_LOG_PARSE_RESULT_FILENAME = "kylin_request_log.csv";
+	static String DEPLOY_ENV;
+
+	final static String[] KYLIN_REQUEST_CSV_HEADER = { "REQUESTER", "REQ_TIME",
+			"REQ_DATE", "URI", "METHOD", "QUERY_STRING", "PAYLOAD",
+			"RESP_STATUS", "TARGET", "ACTION", "DEPLOY_ENV" };
+
+	private ConfigUtils monitorConfig;
+
+	public ApiRequestParser() {
+		monitorConfig = ConfigUtils.getInstance();
+		try {
+			monitorConfig.loadMonitorParam();
+			DEPLOY_ENV = monitorConfig.getDeployEnv();
+		} catch (IOException e) {
+			e.printStackTrace();
+		}
+	}
+
+	public void start() throws IOException, ParseException {
+		ApiRequestParser.REQUEST_PARSE_RESULT_PATH = ConfigUtils.getInstance()
+				.getRequestLogParseResultDir()
+				+ REQUEST_LOG_PARSE_RESULT_FILENAME;
+		this.parseRequestInit();
+
+		// get api req log files have been read
+		String[] hasReadFiles = MonitorMetaManager.getReadApiReqLogFileList();
+
+		List<File> files = this.getRequestLogFiles();
+		for (File file : files) {
+			if (!Arrays.asList(hasReadFiles).contains(file.getName())) {
+				this.parseRequestLog(file.getPath(),
+						ApiRequestParser.REQUEST_PARSE_RESULT_PATH);
+				MonitorMetaManager.markApiReqLogFileAsRead(file.getName());
+			}
+		}
+	}
+
+	public void parseRequestInit() throws IOException {
+		logger.info("parse api request initializing...");
+		FileSystem fs = null;
+		try {
+			Configuration conf = new Configuration();
+			fs = FileSystem.get(conf);
+			org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(
+					ApiRequestParser.REQUEST_PARSE_RESULT_PATH);
+			if (!fs.exists(path)) {
+				fs.create(path);
+
+				// need to close before get FileSystem again
+				fs.close();
+				this.writeResultToHdfs(
+						ApiRequestParser.REQUEST_PARSE_RESULT_PATH,
+						ApiRequestParser.KYLIN_REQUEST_CSV_HEADER);
+			}
+		} catch (Exception e) {
+			fs.close();
+			logger.info("Failed to init:", e);
+		}
+	}
+
+	// parse query log and convert to csv file to hdfs
+	public void parseRequestLog(String filePath, String dPath)
+			throws ParseException, IOException {
+
+		logger.info("Start parsing kylin api request file " + filePath + " !");
+
+		// writer config init
+		FileSystem fs = this.getHdfsFileSystem();
+		org.apache.hadoop.fs.Path resultStorePath = new org.apache.hadoop.fs.Path(
+				dPath);
+		OutputStreamWriter writer = new OutputStreamWriter(
+				fs.append(resultStorePath));
+		CSVWriter cwriter = new CSVWriter(writer, '|',
+				CSVWriter.NO_QUOTE_CHARACTER);
+
+		Pattern p_available = Pattern.compile("/kylin/api/(cubes|user)+.*");
+		Pattern p_request = Pattern
+				.compile("^.*\\[.*KylinApiFilter.logRequest.*\\].*REQUEST:.*REQUESTER=(.*);REQ_TIME=(\\w+ (\\d{4}-\\d{2}-\\d{2}).*);URI=(.*);METHOD=(.*);QUERY_STRING=(.*);PAYLOAD=(.*);RESP_STATUS=(.*);$");
+		Pattern p_uri = Pattern.compile("/kylin/api/(\\w+)(/.*/)*(.*)$");
+		Matcher m_available = p_available.matcher("");
+		Matcher m_request = p_request.matcher("");
+		Matcher m_uri = p_uri.matcher("");
+
+		Path path = Paths.get(filePath);
+		try {
+			BufferedReader reader = Files.newBufferedReader(path, ENCODING);
+			String line = null;
+			while ((line = reader.readLine()) != null) {
+				// reset the input
+				m_available.reset(line);
+				m_request.reset(line);
+
+				// filter unnecessary info
+				if (m_available.find()) {
+					// filter GET info
+					if (m_request.find() && !m_request.group(5).equals("GET")) {
+
+						List<String> groups = new ArrayList<String>();
+						for (int i = 1; i <= m_request.groupCount(); i++) {
+							groups.add(m_request.group(i));
+						}
+
+						String uri = m_request.group(4);
+						m_uri.reset(uri);
+						if (m_uri.find()) {
+
+							// add target
+							groups.add(m_uri.group(1));
+
+							// add action
+							if (m_uri.group(1).equals("cubes")) {
+								String method = m_request.group(5);
+								if ("DELETE".equals(method)) {
+									groups.add("drop");
+								} else if ("POST".equals(method)) {
+									groups.add("save");
+								} else {
+									// add parse action
+									groups.add(m_uri.group(3));
+								}
+							}
+						}
+						groups.add(DEPLOY_ENV);
+						String[] recordArray = groups.toArray(new String[groups
+								.size()]);
+						// write to hdfs
+						cwriter.writeNext(recordArray);
+					}
+				}
+
+			}
+		} catch (IOException ex) {
+			logger.info("Failed to write to hdfs:", ex);
+		} finally {
+			writer.close();
+			cwriter.close();
+			fs.close();
+		}
+
+		logger.info("Finish parsing file " + filePath + " !");
+	}
+
+	public void writeResultToHdfs(String dPath, String[] record)
+			throws IOException {
+		OutputStreamWriter writer = null;
+		CSVWriter cwriter = null;
+		FileSystem fs = null;
+		try {
+
+			fs = this.getHdfsFileSystem();
+			org.apache.hadoop.fs.Path resultStorePath = new org.apache.hadoop.fs.Path(
+					dPath);
+			writer = new OutputStreamWriter(fs.append(resultStorePath));
+			cwriter = new CSVWriter(writer, '|', CSVWriter.NO_QUOTE_CHARACTER);
+			cwriter.writeNext(record);
+
+		} catch (IOException e) {
+			logger.info("Exception", e);
+		} finally {
+			writer.close();
+			cwriter.close();
+			fs.close();
+		}
+	}
+
+	public List<File> getRequestLogFiles() {
+		List<File> logFiles = new ArrayList<File>();
+
+		// String request_log_file_pattern =
+		// monitorConfig.getRequestLogFilePattern();
+
+		List<String> request_log_dir_list = monitorConfig.getLogBaseDir();
+		FileFilter filter = new RegexFileFilter(REQUEST_LOG_FILE_PATTERN);
+
+		for (String path : request_log_dir_list) {
+			logger.info("fetch api request log file from path:" + path);
+			File request_log_dir = new File(path);
+			File[] request_log_files = request_log_dir.listFiles(filter);
+			if (request_log_files == null) {
+				logger.warn("no api request log file found under path" + path);
+				break;
+			}
+			Collections.addAll(logFiles, request_log_files);
+		}
+
+		return logFiles;
+	}
+
+	public FileSystem getHdfsFileSystem() throws IOException {
+		Configuration conf = new Configuration();
+		// conf.set("dfs.client.block.write.replace-datanode-on-failure.policy",
+		// "NEVER");
+		FileSystem fs = null;
+		try {
+			fs = FileSystem.get(conf);
+		} catch (IOException e) {
+			logger.info("Failed to get hdfs FileSystem", e);
+		}
+		return fs;
+	}
 
 }