You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by "ldssea (JIRA)" <ji...@apache.org> on 2016/02/05 02:38:39 UTC

[jira] [Updated] (FLUME-2878) add field projecting Interceptor

     [ https://issues.apache.org/jira/browse/FLUME-2878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

ldssea updated FLUME-2878:
--------------------------
    Description: 
Add  field projecting Interceptor
According to the collection data separator, and then according to the field subscript position, projection of the user needs of the field.
For Example:
data format before projecting
field1|field2|field3|field4|field5
result after projecting
field2|field5|field3


  was:
package com.huawei.streaming.application.flume.interceptors.fieldprojecting;

import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.List;

import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.interceptor.Interceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.huawei.streaming.application.flume.utils.LogUtils;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.huawei.streaming.application.flume.interceptors.standardize.StandardizeInterceptors;

/**
 * 字段投影功能 过滤出用户需要的字段
 * 
 * @author l00349086
 * 
 */
public class FieldProjectingInterceptors implements Interceptor {

	private final String separator;
	private final String[] outFields;
	private final boolean trim;
	private String line;
	private Charset charset = Charset.defaultCharset();
	private static final Logger logger = LoggerFactory
			.getLogger(StandardizeInterceptors.class);

	private FieldProjectingInterceptors(String separator, String[] outFields,
			boolean trim) {
		this.separator = separator;
		this.outFields = outFields;
		this.trim = trim;
	}

	@Override
	public void close() {
		// TODO Auto-generated method stub

	}

	@Override
	public void initialize() {
		// TODO Auto-generated method stub

	}

	@Override
	public Event intercept(Event event) {
		// TODO Auto-generated method stub
		String data = new String(event.getBody(), charset);
		line = data;
		String[] datas = {};
		try {
			datas = StringUtils.splitPreserveAllTokens(data, separator);
			data = fieldProject(datas);
			if (StringUtils.isNotEmpty(data)) {
				Event newEvent = new SimpleEvent();
				newEvent.setHeaders(event.getHeaders());
				newEvent.setBody(data.getBytes(charset));
				return newEvent;
			} else {
				return null;
			}

		} catch (Exception e) {
		    LogUtils.error(logger, "error line[" + line + "]");
		    LogUtils.error(logger, "interceptor event exception : ", e);
			return null;
		}

	}

	@Override
	public List<Event> intercept(List<Event> events) {
		// TODO Auto-generated method stub
		List<Event> intercepted = Lists.newArrayListWithCapacity(events.size());
		for (Event event : events) {
			Event interceptedEvent = intercept(event);
			if (interceptedEvent != null) {
				intercepted.add(interceptedEvent);
			}
		}
		return intercepted;
	}

	private String fieldProject(String[] datas) {
		StringBuilder build = new StringBuilder();
		int length = outFields.length;
		try {
			int i = 1;
			for (String field : outFields) {
				int index = Integer.valueOf(field);
				if (trim) {
					build.append(datas[index - 1].trim());
				} else {
					build.append(datas[index - 1]);
				}

				if (i != length) {
					build.append(separator);
				}
				i++;
			}
			return build.toString();
		} catch (Exception e) {
		    LogUtils.error(logger, "error line[" + line + "]");
		    LogUtils.error(logger, "fieldProject exception : ", e);
			return "";
		}

	}

	public static class Builder implements Interceptor.Builder {
		private String separator;
		private boolean trim;
		private String[] outFields = {};

		@Override
		public void configure(Context context) {
			// 分隔符考虑空格 和 tab符
			separator = context.getString(Constants.SEPARATOR, "|");
			if (StringUtils.isNotEmpty(separator)) {
				if (separator.equalsIgnoreCase("space")) {
					separator = Constants.SPACE;
				}
				if (separator.equalsIgnoreCase("tab")) {
					separator = Constants.TAB;
				}
			}

			String fields = context.getString(Constants.OUTPUT_FIELD, "");
			if (StringUtils.isNotEmpty(fields)) {
				// 处理多空格情况 全部替换为一个空格
				String regex = "\\s+";
				fields = fields.replaceAll(regex, Constants.SPACE);

				outFields = StringUtils.splitPreserveAllTokens(fields,
						Constants.SPACE);
				// 检查outFields,必须是大于0的正整数
				Preconditions.checkArgument(checkStrings(outFields),
						"Supplied outputFields must be number.");
			}
			Preconditions.checkArgument(outFields.length != 0,
					"Supplied outputFields is null.");

			trim = context.getBoolean(Constants.TRIM, false);
		}

		private boolean checkStrings(String[] params) {
			boolean boo = true;
			if (params.length == 0) {
				return false;
			}

			for (String param : params) {
				boo = checkString(param);
				if (!boo) {
					return false;
				}
			}

			return true;
		}

		private boolean checkString(String param) {
			try {
				int flag = Integer.valueOf(param);
				if (flag > 0) {
					return true;
				} else {
					return false;
				}
			} catch (Exception e) {
				return false;
			}
		}

		@Override
		public Interceptor build() {
			// TODO Auto-generated method stub
			return new FieldProjectingInterceptors(separator, outFields, trim);
		}

	}

	public static class Constants {
		public static final String SEPARATOR = "separator";

		public static final String OUTPUT_FIELD = "outputFields";

		public static final String TRIM = "trim";

		public static final String SPACE = " ";

		public static final String TAB = "\t";
	}

}



> add field projecting Interceptor
> --------------------------------
>
>                 Key: FLUME-2878
>                 URL: https://issues.apache.org/jira/browse/FLUME-2878
>             Project: Flume
>          Issue Type: New Feature
>          Components: Sinks+Sources
>         Environment: NA
>            Reporter: ldssea
>   Original Estimate: 8h
>  Remaining Estimate: 8h
>
> Add  field projecting Interceptor
> According to the collection data separator, and then according to the field subscript position, projection of the user needs of the field.
> For Example:
> data format before projecting
> field1|field2|field3|field4|field5
> result after projecting
> field2|field5|field3



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)