You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by "ldssea (JIRA)" <ji...@apache.org> on 2016/02/05 02:17:39 UTC
[jira] [Created] (FLUME-2878) add field projecting Interceptor
ldssea created FLUME-2878:
-----------------------------
Summary: add field projecting Interceptor
Key: FLUME-2878
URL: https://issues.apache.org/jira/browse/FLUME-2878
Project: Flume
Issue Type: New Feature
Components: Sinks+Sources
Environment: NA
Reporter: ldssea
package com.huawei.streaming.application.flume.interceptors.fieldprojecting;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.interceptor.Interceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.huawei.streaming.application.flume.utils.LogUtils;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.huawei.streaming.application.flume.interceptors.standardize.StandardizeInterceptors;
/**
* 字段投影功能 过滤出用户需要的字段
*
* @author l00349086
*
*/
public class FieldProjectingInterceptors implements Interceptor {
private final String separator;
private final String[] outFields;
private final boolean trim;
private String line;
private Charset charset = Charset.defaultCharset();
private static final Logger logger = LoggerFactory
.getLogger(StandardizeInterceptors.class);
private FieldProjectingInterceptors(String separator, String[] outFields,
boolean trim) {
this.separator = separator;
this.outFields = outFields;
this.trim = trim;
}
@Override
public void close() {
// TODO Auto-generated method stub
}
@Override
public void initialize() {
// TODO Auto-generated method stub
}
@Override
public Event intercept(Event event) {
// TODO Auto-generated method stub
String data = new String(event.getBody(), charset);
line = data;
String[] datas = {};
try {
datas = StringUtils.splitPreserveAllTokens(data, separator);
data = fieldProject(datas);
if (StringUtils.isNotEmpty(data)) {
Event newEvent = new SimpleEvent();
newEvent.setHeaders(event.getHeaders());
newEvent.setBody(data.getBytes(charset));
return newEvent;
} else {
return null;
}
} catch (Exception e) {
LogUtils.error(logger, "error line[" + line + "]");
LogUtils.error(logger, "interceptor event exception : ", e);
return null;
}
}
@Override
public List<Event> intercept(List<Event> events) {
// TODO Auto-generated method stub
List<Event> intercepted = Lists.newArrayListWithCapacity(events.size());
for (Event event : events) {
Event interceptedEvent = intercept(event);
if (interceptedEvent != null) {
intercepted.add(interceptedEvent);
}
}
return intercepted;
}
private String fieldProject(String[] datas) {
StringBuilder build = new StringBuilder();
int length = outFields.length;
try {
int i = 1;
for (String field : outFields) {
int index = Integer.valueOf(field);
if (trim) {
build.append(datas[index - 1].trim());
} else {
build.append(datas[index - 1]);
}
if (i != length) {
build.append(separator);
}
i++;
}
return build.toString();
} catch (Exception e) {
LogUtils.error(logger, "error line[" + line + "]");
LogUtils.error(logger, "fieldProject exception : ", e);
return "";
}
}
public static class Builder implements Interceptor.Builder {
private String separator;
private boolean trim;
private String[] outFields = {};
@Override
public void configure(Context context) {
// 分隔符考虑空格 和 tab符
separator = context.getString(Constants.SEPARATOR, "|");
if (StringUtils.isNotEmpty(separator)) {
if (separator.equalsIgnoreCase("space")) {
separator = Constants.SPACE;
}
if (separator.equalsIgnoreCase("tab")) {
separator = Constants.TAB;
}
}
String fields = context.getString(Constants.OUTPUT_FIELD, "");
if (StringUtils.isNotEmpty(fields)) {
// 处理多空格情况 全部替换为一个空格
String regex = "\\s+";
fields = fields.replaceAll(regex, Constants.SPACE);
outFields = StringUtils.splitPreserveAllTokens(fields,
Constants.SPACE);
// 检查outFields,必须是大于0的正整数
Preconditions.checkArgument(checkStrings(outFields),
"Supplied outputFields must be number.");
}
Preconditions.checkArgument(outFields.length != 0,
"Supplied outputFields is null.");
trim = context.getBoolean(Constants.TRIM, false);
}
private boolean checkStrings(String[] params) {
boolean boo = true;
if (params.length == 0) {
return false;
}
for (String param : params) {
boo = checkString(param);
if (!boo) {
return false;
}
}
return true;
}
private boolean checkString(String param) {
try {
int flag = Integer.valueOf(param);
if (flag > 0) {
return true;
} else {
return false;
}
} catch (Exception e) {
return false;
}
}
@Override
public Interceptor build() {
// TODO Auto-generated method stub
return new FieldProjectingInterceptors(separator, outFields, trim);
}
}
public static class Constants {
public static final String SEPARATOR = "separator";
public static final String OUTPUT_FIELD = "outputFields";
public static final String TRIM = "trim";
public static final String SPACE = " ";
public static final String TAB = "\t";
}
}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)