You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ja...@apache.org on 2018/02/07 07:07:16 UTC

[09/10] eagle git commit: [EAGLE-1081] Checkstyle fixes for eagle-entity-base module

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReader.java
index c3d916e..6dfe27d 100755
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReader.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReader.java
@@ -29,97 +29,103 @@ import java.util.ArrayList;
 import java.util.List;
 
 public class GenericEntityStreamReader extends StreamReader {
-	private static final Logger LOG = LoggerFactory.getLogger(GenericEntityStreamReader.class);
-	
-	private EntityDefinition entityDef;
-	private SearchCondition condition;
-	private String prefix;
-	private StreamReader readerAfterPlan;
-
-	public GenericEntityStreamReader(String serviceName, SearchCondition condition) throws InstantiationException, IllegalAccessException{
-		this(serviceName, condition, null);
-	}
-
-	public GenericEntityStreamReader(EntityDefinition entityDef, SearchCondition condition) throws InstantiationException, IllegalAccessException{
-		this(entityDef, condition, entityDef.getPrefix());
-	}
-	
-	public GenericEntityStreamReader(String serviceName, SearchCondition condition, String prefix) throws InstantiationException, IllegalAccessException{
-		this.prefix = prefix;
-		checkNotNull(serviceName, "serviceName");
-		this.entityDef = EntityDefinitionManager.getEntityByServiceName(serviceName);
-		checkNotNull(entityDef, "EntityDefinition");
-		this.condition = condition;
-		this.readerAfterPlan = selectQueryReader();
-	}
-
-	public GenericEntityStreamReader(EntityDefinition entityDef, SearchCondition condition, String prefix) throws InstantiationException, IllegalAccessException{
-		this.prefix = prefix;
-		checkNotNull(entityDef, "entityDef");
-		this.entityDef = entityDef;
-		checkNotNull(entityDef, "EntityDefinition");
-		this.condition = condition;
-		this.readerAfterPlan = selectQueryReader();
-	}
-
-	private void checkNotNull(Object o, String message){
-		if(o == null){
-			throw new IllegalArgumentException(message + " should not be null");
-		}
-	}
-	
-	public EntityDefinition getEntityDefinition() {
-		return entityDef;
-	}
-	
-	public SearchCondition getSearchCondition() {
-		return condition;
-	}
-	
-	@Override
-	public void readAsStream() throws Exception{
-		readerAfterPlan._listeners.addAll(this._listeners);
-		readerAfterPlan.readAsStream();
-	}
-	
-	private StreamReader selectQueryReader() throws InstantiationException, IllegalAccessException {
-		final ORExpression query = condition.getQueryExpression();
-		IndexDefinition[] indexDefs = entityDef.getIndexes();
+    private static final Logger LOG = LoggerFactory.getLogger(GenericEntityStreamReader.class);
+
+    private EntityDefinition entityDef;
+    private SearchCondition condition;
+    private String prefix;
+    private StreamReader readerAfterPlan;
+
+    public GenericEntityStreamReader(String serviceName, SearchCondition condition)
+        throws InstantiationException, IllegalAccessException {
+        this(serviceName, condition, null);
+    }
+
+    public GenericEntityStreamReader(EntityDefinition entityDef, SearchCondition condition)
+        throws InstantiationException, IllegalAccessException {
+        this(entityDef, condition, entityDef.getPrefix());
+    }
+
+    public GenericEntityStreamReader(String serviceName, SearchCondition condition, String prefix)
+        throws InstantiationException, IllegalAccessException {
+        this.prefix = prefix;
+        checkNotNull(serviceName, "serviceName");
+        this.entityDef = EntityDefinitionManager.getEntityByServiceName(serviceName);
+        checkNotNull(entityDef, "EntityDefinition");
+        this.condition = condition;
+        this.readerAfterPlan = selectQueryReader();
+    }
+
+    public GenericEntityStreamReader(EntityDefinition entityDef, SearchCondition condition, String prefix)
+        throws InstantiationException, IllegalAccessException {
+        this.prefix = prefix;
+        checkNotNull(entityDef, "entityDef");
+        this.entityDef = entityDef;
+        checkNotNull(entityDef, "EntityDefinition");
+        this.condition = condition;
+        this.readerAfterPlan = selectQueryReader();
+    }
+
+    private void checkNotNull(Object o, String message) {
+        if (o == null) {
+            throw new IllegalArgumentException(message + " should not be null");
+        }
+    }
+
+    public EntityDefinition getEntityDefinition() {
+        return entityDef;
+    }
+
+    public SearchCondition getSearchCondition() {
+        return condition;
+    }
+
+    @Override
+    public void readAsStream() throws Exception {
+        readerAfterPlan.listeners.addAll(this.listeners);
+        readerAfterPlan.readAsStream();
+    }
+
+    private StreamReader selectQueryReader() throws InstantiationException, IllegalAccessException {
+        final ORExpression query = condition.getQueryExpression();
+        IndexDefinition[] indexDefs = entityDef.getIndexes();
 
         // Index just works with query condition
-		if (indexDefs != null && condition.getQueryExpression()!=null) {
-			List<byte[]> rowkeys = new ArrayList<>();
-			for (IndexDefinition index : indexDefs) {
-				// Check unique index first
-				if (index.isUnique()) {
-					final IndexDefinition.IndexType type = index.canGoThroughIndex(query, rowkeys);
-					if (!IndexDefinition.IndexType.NON_INDEX.equals(type)) {
-						LOG.info("Selectd query unique index " + index.getIndexName() + " for query: " + condition.getQueryExpression());
-						return new UniqueIndexStreamReader(index, condition, rowkeys);
-					}
-				}
-			}
-			for (IndexDefinition index : indexDefs) {
-				// Check non-clustered index
-				if (!index.isUnique()) {
-					final IndexDefinition.IndexType type = index.canGoThroughIndex(query, rowkeys);
-					if (!IndexDefinition.IndexType.NON_INDEX.equals(type)) {
-						LOG.info("Selectd query non clustered index " + index.getIndexName() + " for query: " + condition.getQueryExpression().toString());
-						return new NonClusteredIndexStreamReader(index, condition, rowkeys);
-					}
-				}
-			}
-		}
-		return new GenericEntityScanStreamReader(entityDef, condition, this.prefix);
-	}
-
-	@Override
-	public long getLastTimestamp() {
-		return readerAfterPlan.getLastTimestamp();
-	}
-
-	@Override
-	public long getFirstTimestamp() {
-		return readerAfterPlan.getFirstTimestamp();
-	}
+        if (indexDefs != null && condition.getQueryExpression() != null) {
+            List<byte[]> rowkeys = new ArrayList<>();
+            for (IndexDefinition index : indexDefs) {
+                // Check unique index first
+                if (index.isUnique()) {
+                    final IndexDefinition.IndexType type = index.canGoThroughIndex(query, rowkeys);
+                    if (!IndexDefinition.IndexType.NON_INDEX.equals(type)) {
+                        LOG.info("Selectd query unique index " + index.getIndexName() + " for query: "
+                                 + condition.getQueryExpression());
+                        return new UniqueIndexStreamReader(index, condition, rowkeys);
+                    }
+                }
+            }
+            for (IndexDefinition index : indexDefs) {
+                // Check non-clustered index
+                if (!index.isUnique()) {
+                    final IndexDefinition.IndexType type = index.canGoThroughIndex(query, rowkeys);
+                    if (!IndexDefinition.IndexType.NON_INDEX.equals(type)) {
+                        LOG.info("Selectd query non clustered index " + index.getIndexName() + " for query: "
+                                 + condition.getQueryExpression().toString());
+                        return new NonClusteredIndexStreamReader(index, condition, rowkeys);
+                    }
+                }
+            }
+        }
+        return new GenericEntityScanStreamReader(entityDef, condition, this.prefix);
+    }
+
+    @Override
+    public long getLastTimestamp() {
+        return readerAfterPlan.getLastTimestamp();
+    }
+
+    @Override
+    public long getFirstTimestamp() {
+        return readerAfterPlan.getFirstTimestamp();
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReaderMT.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReaderMT.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReaderMT.java
index bf72a36..15bdd20 100755
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReaderMT.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReaderMT.java
@@ -31,121 +31,124 @@ import org.apache.eagle.common.DateTimeUtil;
 
 /**
  * multi-threading stream readers which only applies to time-series entity where we split the query into
- * different time range
- * 
- * When this class is used together with list query or aggregate query, be aware that the query's behavior could
- * be changed for example pageSize does not work well, output sequence is not determined
+ * different time range When this class is used together with list query or aggregate query, be aware that the
+ * query's behavior could be changed for example pageSize does not work well, output sequence is not
+ * determined
  */
-public class GenericEntityStreamReaderMT extends StreamReader{
-	private static final Logger LOG = LoggerFactory.getLogger(GenericEntityStreamReaderMT.class);
-	private List<GenericEntityStreamReader> readers = new ArrayList<GenericEntityStreamReader>(); 
-	
-	public GenericEntityStreamReaderMT(String serviceName, SearchCondition condition, int numThreads) throws Exception{
-		checkIsTimeSeries(serviceName);
-		checkNumThreads(numThreads);
-		long queryStartTime = condition.getStartTime();
-		long queryEndTime = condition.getEndTime();
-		long subStartTime = queryStartTime;
-		long subEndTime = 0;
-		long interval = (queryEndTime-queryStartTime) / numThreads;
-		for(int i=0; i<numThreads; i++){
-			// split search condition by time range
-			subStartTime = queryStartTime + i*interval;
-			if(i == numThreads-1){
-				subEndTime = queryEndTime;
-			}else{
-				subEndTime = subStartTime + interval;
-			}
-			//String strStartTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(subStartTime);
-			//String strEndTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(subEndTime);
-			SearchCondition sc = new SearchCondition(condition);
-			sc.setStartTime(subStartTime);
-			sc.setEndTime(subEndTime);
-			GenericEntityStreamReader reader = new GenericEntityStreamReader(serviceName, sc);
-			readers.add(reader);
-		}
-	}
-	
-	private void checkIsTimeSeries(String serviceName) throws Exception{
-		EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName(serviceName);
-		if(!ed.isTimeSeries()){
-			throw new IllegalArgumentException("Multi-threading stream reader must be applied to time series table");
-		}
-	}
-	
-	private void checkNumThreads(int numThreads){
-		if(numThreads <= 0){
-			throw new IllegalArgumentException("Multi-threading stream reader must have numThreads >= 1");
-		}
-	}
-	
-	/**
-	 * default to 2 threads
-	 * @param serviceName
-	 * @param condition
-	 */
-	public GenericEntityStreamReaderMT(String serviceName, SearchCondition condition) throws Exception{
-		this(serviceName, condition, 2);
-	}
-	
-	@Override
-	public void readAsStream() throws Exception{
-		// populate listeners to all readers
-		for(EntityCreationListener l : _listeners){
-			for(GenericEntityStreamReader r : readers){
-				r.register(l);
-			}
-		}
-
-		List<Future<Void>> futures = new ArrayList<Future<Void>>();
-		for(GenericEntityStreamReader r : readers){
-			SingleReader reader = new SingleReader(r);
-			Future<Void> readFuture = EagleConfigFactory.load().getExecutor().submit(reader);
-			futures.add(readFuture);
-		}
-		
-		// join threads and check exceptions
-		for(Future<Void> future : futures){
-			try{
-				future.get();
-			}catch(Exception ex){
-				LOG.error("Error in read", ex);
-				throw ex;
-			}
-		}
-	}
-	
-	private static class SingleReader implements Callable<Void>{
-		private GenericEntityStreamReader reader;
-		public SingleReader(GenericEntityStreamReader reader){
-			this.reader = reader;
-		}
-		@Override
-		public Void call() throws Exception{
-			reader.readAsStream();
-			return null;
-		}
-	}
-
-	@Override
-	public long getLastTimestamp() {
-		long lastTimestamp = 0;
-		for (GenericEntityStreamReader reader : readers) {
-			if (lastTimestamp < reader.getLastTimestamp()) {
-				lastTimestamp = reader.getLastTimestamp();
-			}
-		}
-		return lastTimestamp;
-	}
-
-	@Override
-	public long getFirstTimestamp() {
-		long firstTimestamp = 0;
-		for (GenericEntityStreamReader reader : readers) {
-			if (firstTimestamp > reader.getLastTimestamp() || firstTimestamp == 0) {
-				firstTimestamp = reader.getLastTimestamp();
-			}
-		}
-		return firstTimestamp;
-	}
+public class GenericEntityStreamReaderMT extends StreamReader {
+    private static final Logger LOG = LoggerFactory.getLogger(GenericEntityStreamReaderMT.class);
+    private List<GenericEntityStreamReader> readers = new ArrayList<GenericEntityStreamReader>();
+
+    public GenericEntityStreamReaderMT(String serviceName, SearchCondition condition, int numThreads)
+        throws Exception {
+        checkIsTimeSeries(serviceName);
+        checkNumThreads(numThreads);
+        long queryStartTime = condition.getStartTime();
+        long queryEndTime = condition.getEndTime();
+        long subStartTime = queryStartTime;
+        long subEndTime = 0;
+        long interval = (queryEndTime - queryStartTime) / numThreads;
+        for (int i = 0; i < numThreads; i++) {
+            // split search condition by time range
+            subStartTime = queryStartTime + i * interval;
+            if (i == numThreads - 1) {
+                subEndTime = queryEndTime;
+            } else {
+                subEndTime = subStartTime + interval;
+            }
+            // String strStartTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(subStartTime);
+            // String strEndTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(subEndTime);
+            SearchCondition sc = new SearchCondition(condition);
+            sc.setStartTime(subStartTime);
+            sc.setEndTime(subEndTime);
+            GenericEntityStreamReader reader = new GenericEntityStreamReader(serviceName, sc);
+            readers.add(reader);
+        }
+    }
+
+    private void checkIsTimeSeries(String serviceName) throws Exception {
+        EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName(serviceName);
+        if (!ed.isTimeSeries()) {
+            throw new IllegalArgumentException("Multi-threading stream reader must be applied to time series table");
+        }
+    }
+
+    private void checkNumThreads(int numThreads) {
+        if (numThreads <= 0) {
+            throw new IllegalArgumentException("Multi-threading stream reader must have numThreads >= 1");
+        }
+    }
+
+    /**
+     * default to 2 threads
+     *
+     * @param serviceName
+     * @param condition
+     */
+    public GenericEntityStreamReaderMT(String serviceName, SearchCondition condition) throws Exception {
+        this(serviceName, condition, 2);
+    }
+
+    @Override
+    public void readAsStream() throws Exception {
+        // populate listeners to all readers
+        for (EntityCreationListener l : listeners) {
+            for (GenericEntityStreamReader r : readers) {
+                r.register(l);
+            }
+        }
+
+        List<Future<Void>> futures = new ArrayList<Future<Void>>();
+        for (GenericEntityStreamReader r : readers) {
+            SingleReader reader = new SingleReader(r);
+            Future<Void> readFuture = EagleConfigFactory.load().getExecutor().submit(reader);
+            futures.add(readFuture);
+        }
+
+        // join threads and check exceptions
+        for (Future<Void> future : futures) {
+            try {
+                future.get();
+            } catch (Exception ex) {
+                LOG.error("Error in read", ex);
+                throw ex;
+            }
+        }
+    }
+
+    private static class SingleReader implements Callable<Void> {
+        private GenericEntityStreamReader reader;
+
+        public SingleReader(GenericEntityStreamReader reader) {
+            this.reader = reader;
+        }
+
+        @Override
+        public Void call() throws Exception {
+            reader.readAsStream();
+            return null;
+        }
+    }
+
+    @Override
+    public long getLastTimestamp() {
+        long lastTimestamp = 0;
+        for (GenericEntityStreamReader reader : readers) {
+            if (lastTimestamp < reader.getLastTimestamp()) {
+                lastTimestamp = reader.getLastTimestamp();
+            }
+        }
+        return lastTimestamp;
+    }
+
+    @Override
+    public long getFirstTimestamp() {
+        long firstTimestamp = 0;
+        for (GenericEntityStreamReader reader : readers) {
+            if (firstTimestamp > reader.getLastTimestamp() || firstTimestamp == 0) {
+                firstTimestamp = reader.getLastTimestamp();
+            }
+        }
+        return firstTimestamp;
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityWriter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityWriter.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityWriter.java
index 5c8b12d..926fcba 100755
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityWriter.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityWriter.java
@@ -27,52 +27,53 @@ import java.util.ArrayList;
 import java.util.List;
 
 public class GenericEntityWriter {
-	private static final Logger LOG = LoggerFactory.getLogger(GenericEntityWriter.class);
-	private EntityDefinition entityDef;
+    private static final Logger LOG = LoggerFactory.getLogger(GenericEntityWriter.class);
+    private EntityDefinition entityDef;
 
-	public GenericEntityWriter(String serviceName) throws InstantiationException, IllegalAccessException{
-		this.entityDef = EntityDefinitionManager.getEntityByServiceName(serviceName);
-		checkNotNull(entityDef, "serviceName");
-	}
+    public GenericEntityWriter(String serviceName) throws InstantiationException, IllegalAccessException {
+        this.entityDef = EntityDefinitionManager.getEntityByServiceName(serviceName);
+        checkNotNull(entityDef, "serviceName");
+    }
 
-	public GenericEntityWriter(EntityDefinition entityDef) throws InstantiationException, IllegalAccessException{
-		this.entityDef = entityDef;
-		checkNotNull(entityDef, "serviceName");
-	}
-	
-	private void checkNotNull(Object o, String message) {
-		if(o == null){
-			throw new IllegalArgumentException(message + " should not be null");
-		}
-	}
+    public GenericEntityWriter(EntityDefinition entityDef)
+        throws InstantiationException, IllegalAccessException {
+        this.entityDef = entityDef;
+        checkNotNull(entityDef, "serviceName");
+    }
 
-	/**
-	 * @param entities
-	 * @return row keys
-	 * @throws Exception
-	 */
-	public List<String> write(List<? extends TaggedLogAPIEntity> entities) throws Exception{
-		HBaseLogWriter writer = new HBaseLogWriter(entityDef.getTable(), entityDef.getColumnFamily());
-		List<String> rowkeys = new ArrayList<String>(entities.size());
-		List<InternalLog> logs = new ArrayList<InternalLog>(entities.size());
-		
-		try{
-			writer.open();
-			for(TaggedLogAPIEntity entity : entities){
-				final InternalLog entityLog = HBaseInternalLogHelper.convertToInternalLog(entity, entityDef);
-				logs.add(entityLog);
-			}
-			List<byte[]> bRowkeys  = writer.write(logs);
-			for (byte[] rowkey : bRowkeys) {
-				rowkeys.add(EagleBase64Wrapper.encodeByteArray2URLSafeString(rowkey));
-			}
+    private void checkNotNull(Object o, String message) {
+        if (o == null) {
+            throw new IllegalArgumentException(message + " should not be null");
+        }
+    }
 
-		}catch(Exception ex){
-			LOG.error("fail writing tagged log", ex);
-			throw ex;
-		}finally{
-			writer.close();
-	 	}
-		return rowkeys;
-	}
+    /**
+     * @param entities
+     * @return row keys
+     * @throws Exception
+     */
+    public List<String> write(List<? extends TaggedLogAPIEntity> entities) throws Exception {
+        HBaseLogWriter writer = new HBaseLogWriter(entityDef.getTable(), entityDef.getColumnFamily());
+        List<String> rowkeys = new ArrayList<String>(entities.size());
+        List<InternalLog> logs = new ArrayList<InternalLog>(entities.size());
+
+        try {
+            writer.open();
+            for (TaggedLogAPIEntity entity : entities) {
+                final InternalLog entityLog = HBaseInternalLogHelper.convertToInternalLog(entity, entityDef);
+                logs.add(entityLog);
+            }
+            List<byte[]> bRowkeys = writer.write(logs);
+            for (byte[] rowkey : bRowkeys) {
+                rowkeys.add(EagleBase64Wrapper.encodeByteArray2URLSafeString(rowkey));
+            }
+
+        } catch (Exception ex) {
+            LOG.error("fail writing tagged log", ex);
+            throw ex;
+        } finally {
+            writer.close();
+        }
+        return rowkeys;
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntity.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntity.java
index 9f6937b..56cd453 100755
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntity.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntity.java
@@ -21,33 +21,35 @@ import org.apache.eagle.log.entity.meta.*;
 import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 /**
- * GenericMetricEntity should use prefix field which is extended from TaggedLogAPIEntity as metric name
- * metric name is used to partition the metric tables
+ * GenericMetricEntity should use prefix field which is extended from TaggedLogAPIEntity as metric name metric
+ * name is used to partition the metric tables
  */
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
 @Table("eagle_metric")
 @ColumnFamily("f")
 @Prefix(GenericMetricEntity.GENERIC_METRIC_PREFIX_PLACE_HOLDER)
 @Service(GenericMetricEntity.GENERIC_METRIC_SERVICE)
 @TimeSeries(true)
-@Metric(interval=60000)
+@Metric(interval = 60000)
 @ServicePath(path = "/metric")
 // TODO:
-@Tags({"site","application","policyId","alertExecutorId", "streamName","source","partitionSeq"})
+@Tags({
+       "site", "application", "policyId", "alertExecutorId", "streamName", "source", "partitionSeq"
+    })
 public class GenericMetricEntity extends TaggedLogAPIEntity {
-	public static final String GENERIC_METRIC_SERVICE = "GenericMetricService";
-	public static final String GENERIC_METRIC_PREFIX_PLACE_HOLDER = "GENERIC_METRIC_PREFIX_PLACEHODLER";
-	public static final String VALUE_FIELD ="value";
+    public static final String GENERIC_METRIC_SERVICE = "GenericMetricService";
+    public static final String GENERIC_METRIC_PREFIX_PLACE_HOLDER = "GENERIC_METRIC_PREFIX_PLACEHODLER";
+    public static final String VALUE_FIELD = "value";
 
-	@Column("a")
-	private double[] value;
+    @Column("a")
+    private double[] value;
 
-	public double[] getValue() {
-		return value;
-	}
+    public double[] getValue() {
+        return value;
+    }
 
-	public void setValue(double[] value) {
-		this.value = value;
-		pcs.firePropertyChange("value", null, null);
-	}
-}
\ No newline at end of file
+    public void setValue(double[] value) {
+        this.value = value;
+        pcs.firePropertyChange("value", null, null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityBatchReader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityBatchReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityBatchReader.java
index 84b02ae..bc99a81 100755
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityBatchReader.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityBatchReader.java
@@ -23,32 +23,37 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.List;
 
-public class GenericMetricEntityBatchReader  implements EntityCreationListener{
-	private static final Logger LOG = LoggerFactory.getLogger(GenericEntityBatchReader.class);
-	
-	private List<TaggedLogAPIEntity> entities = new ArrayList<TaggedLogAPIEntity>();
-	private GenericEntityStreamReader reader;
-	
-	public GenericMetricEntityBatchReader(String metricName, SearchCondition condition) throws Exception{
-		reader = new GenericEntityStreamReader(GenericMetricEntity.GENERIC_METRIC_SERVICE, condition, metricName);
-	}
-	
-	public long getLastTimestamp() {
-		return reader.getLastTimestamp();
-	}
-	public long getFirstTimestamp() {
-		return reader.getFirstTimestamp();
-	}
-	@Override
-	public void entityCreated(TaggedLogAPIEntity entity){
-		entities.add(entity);
-	}
-	
-	@SuppressWarnings("unchecked")
-	public <T> List<T> read() throws Exception{
-		if(LOG.isDebugEnabled()) LOG.debug("Start reading as batch mode");
-		reader.register(this);
-		reader.readAsStream();
-		return (List<T>)entities;
-	}
+public class GenericMetricEntityBatchReader implements EntityCreationListener {
+    private static final Logger LOG = LoggerFactory.getLogger(GenericEntityBatchReader.class);
+
+    private List<TaggedLogAPIEntity> entities = new ArrayList<TaggedLogAPIEntity>();
+    private GenericEntityStreamReader reader;
+
+    public GenericMetricEntityBatchReader(String metricName, SearchCondition condition) throws Exception {
+        reader = new GenericEntityStreamReader(GenericMetricEntity.GENERIC_METRIC_SERVICE, condition,
+                                               metricName);
+    }
+
+    public long getLastTimestamp() {
+        return reader.getLastTimestamp();
+    }
+
+    public long getFirstTimestamp() {
+        return reader.getFirstTimestamp();
+    }
+
+    @Override
+    public void entityCreated(TaggedLogAPIEntity entity) {
+        entities.add(entity);
+    }
+
+    @SuppressWarnings("unchecked")
+    public <T> List<T> read() throws Exception {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Start reading as batch mode");
+        }
+        reader.register(this);
+        reader.readAsStream();
+        return (List<T>)entities;
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityDecompactionStreamReader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityDecompactionStreamReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityDecompactionStreamReader.java
index 1cf3905..216022f 100755
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityDecompactionStreamReader.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityDecompactionStreamReader.java
@@ -25,74 +25,79 @@ import org.slf4j.LoggerFactory;
 
 import java.text.ParseException;
 
-public class GenericMetricEntityDecompactionStreamReader extends StreamReader implements EntityCreationListener{
-	@SuppressWarnings("unused")
-	private static final Logger LOG = LoggerFactory.getLogger(GenericMetricEntityDecompactionStreamReader.class);
-	private GenericEntityStreamReader reader;
-	private EntityDefinition ed;
-	private String serviceName = GenericMetricEntity.GENERIC_METRIC_SERVICE;
-	private long start;
-	private long end;
-	private GenericMetricShadowEntity single = new GenericMetricShadowEntity();
-	
-	/**
-	 * it makes sense that serviceName should not be provided while metric name should be provided as prefix
-	 * @param metricName
-	 * @param condition
-	 * @throws InstantiationException
-	 * @throws IllegalAccessException
-	 * @throws ParseException
-	 */
-	public GenericMetricEntityDecompactionStreamReader(String metricName, SearchCondition condition) throws InstantiationException, IllegalAccessException, ParseException{
-		ed = EntityDefinitionManager.getEntityByServiceName(serviceName);
-		checkIsMetric(ed);
-		reader = new GenericEntityStreamReader(serviceName, condition, metricName);
-		start = condition.getStartTime();
-		end = condition.getEndTime();
-	}
-	
-	private void checkIsMetric(EntityDefinition ed){
-		if(ed.getMetricDefinition() == null)
-			throw new IllegalArgumentException("Only metric entity comes here");
-	}
-	
-	@Override
-	public void entityCreated(TaggedLogAPIEntity entity) throws Exception{
-		GenericMetricEntity e = (GenericMetricEntity)entity;
-		double[] value = e.getValue();
-		if(value != null) {
-			int count =value.length;
-			@SuppressWarnings("unused")
-			Class<?> cls = ed.getMetricDefinition().getSingleTimestampEntityClass();
-			for (int i = 0; i < count; i++) {
-				long ts = entity.getTimestamp() + i * ed.getMetricDefinition().getInterval();
-				// exclude those entity which is not within the time range in search condition. [start, end)
-				if (ts < start || ts >= end) {
-					continue;
-				}
-				single.setTimestamp(ts);
-				single.setTags(entity.getTags());
-				single.setValue(e.getValue()[i]);
-				for (EntityCreationListener l : _listeners) {
-					l.entityCreated(single);
-				}
-			}
-		}
-	}
-	
-	@Override
-	public void readAsStream() throws Exception{
-		reader.register(this);
-		reader.readAsStream();
-	}
+public class GenericMetricEntityDecompactionStreamReader extends StreamReader
+    implements EntityCreationListener {
+    @SuppressWarnings("unused")
+    private static final Logger LOG = LoggerFactory
+        .getLogger(GenericMetricEntityDecompactionStreamReader.class);
+    private GenericEntityStreamReader reader;
+    private EntityDefinition ed;
+    private String serviceName = GenericMetricEntity.GENERIC_METRIC_SERVICE;
+    private long start;
+    private long end;
+    private GenericMetricShadowEntity single = new GenericMetricShadowEntity();
 
-	@Override
-	public long getLastTimestamp() {
-		return reader.getLastTimestamp();
-	}
+    /**
+     * it makes sense that serviceName should not be provided while metric name should be provided as prefix
+     *
+     * @param metricName
+     * @param condition
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws ParseException
+     */
+    public GenericMetricEntityDecompactionStreamReader(String metricName, SearchCondition condition)
+        throws InstantiationException, IllegalAccessException, ParseException {
+        ed = EntityDefinitionManager.getEntityByServiceName(serviceName);
+        checkIsMetric(ed);
+        reader = new GenericEntityStreamReader(serviceName, condition, metricName);
+        start = condition.getStartTime();
+        end = condition.getEndTime();
+    }
 
-	@Override
-	public long getFirstTimestamp() {
-		return reader.getFirstTimestamp();
-	}
-}
\ No newline at end of file
+    private void checkIsMetric(EntityDefinition ed) {
+        if (ed.getMetricDefinition() == null) {
+            throw new IllegalArgumentException("Only metric entity comes here");
+        }
+    }
+
+    @Override
+    public void entityCreated(TaggedLogAPIEntity entity) throws Exception {
+        GenericMetricEntity e = (GenericMetricEntity)entity;
+        double[] value = e.getValue();
+        if (value != null) {
+            int count = value.length;
+            @SuppressWarnings("unused")
+            Class<?> cls = ed.getMetricDefinition().getSingleTimestampEntityClass();
+            for (int i = 0; i < count; i++) {
+                long ts = entity.getTimestamp() + i * ed.getMetricDefinition().getInterval();
+                // exclude those entity which is not within the time range in search condition. [start, end)
+                if (ts < start || ts >= end) {
+                    continue;
+                }
+                single.setTimestamp(ts);
+                single.setTags(entity.getTags());
+                single.setValue(e.getValue()[i]);
+                for (EntityCreationListener l : listeners) {
+                    l.entityCreated(single);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void readAsStream() throws Exception {
+        reader.register(this);
+        reader.readAsStream();
+    }
+
+    @Override
+    public long getLastTimestamp() {
+        return reader.getLastTimestamp();
+    }
+
+    @Override
+    public long getFirstTimestamp() {
+        return reader.getFirstTimestamp();
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricShadowEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricShadowEntity.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricShadowEntity.java
index acd1290..8ead7cd 100644
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricShadowEntity.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricShadowEntity.java
@@ -22,13 +22,13 @@ import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
  * just a shadow class to avoid dynamically create the class and instantiate using reflection
  */
 public class GenericMetricShadowEntity extends TaggedLogAPIEntity {
-	private double value;
+    private double value;
 
-	public double getValue() {
-		return value;
-	}
+    public double getValue() {
+        return value;
+    }
 
-	public void setValue(double value) {
-		this.value = value;
-	}
+    public void setValue(double value) {
+        this.value = value;
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java
index 6869c7c..97f538c 100644
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java
@@ -35,24 +35,27 @@ import java.util.Map;
  */
 @XmlRootElement
 @XmlAccessorType(XmlAccessType.FIELD)
-@XmlType(propOrder = {"success","exception","meta","type","obj"})
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@XmlType(propOrder = {
+                      "success", "exception", "meta", "type", "obj"
+    })
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
 @JsonDeserialize(using = GenericServiceAPIResponseEntityDeserializer.class)
-@JsonIgnoreProperties(ignoreUnknown=true)
-public class GenericServiceAPIResponseEntity<T>{
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class GenericServiceAPIResponseEntity<T> {
     /**
      * Please use primitive type of value in meta as possible
      */
-    private Map<String,Object> meta;
-	private boolean success;
-	private String exception;
+    private Map<String, Object> meta;
+    private boolean success;
+    private String exception;
     private List<T> obj;
     private Class<T> type;
 
-    public GenericServiceAPIResponseEntity(){
+    public GenericServiceAPIResponseEntity() {
         // default constructor
     }
-    public GenericServiceAPIResponseEntity(Class<T> type){
+
+    public GenericServiceAPIResponseEntity(Class<T> type) {
         this.setType(type);
     }
 
@@ -72,7 +75,7 @@ public class GenericServiceAPIResponseEntity<T>{
         this.obj = obj;
     }
 
-    public void setObj(List<T> obj,Class<T> type) {
+    public void setObj(List<T> obj, Class<T> type) {
         this.setObj(obj);
         this.setType(type);
     }
@@ -85,10 +88,10 @@ public class GenericServiceAPIResponseEntity<T>{
      * Set the first object's class as type
      */
     @SuppressWarnings("unused")
-    public void setTypeByObj(){
-        for(T t:this.obj){
-            if(this.type == null && t!=null){
-                this.type = (Class<T>) t.getClass();
+    public void setTypeByObj() {
+        for (T t : this.obj) {
+            if (this.type == null && t != null) {
+                this.type = (Class<T>)t.getClass();
             }
         }
     }
@@ -102,17 +105,19 @@ public class GenericServiceAPIResponseEntity<T>{
         this.type = type;
     }
 
-	public boolean isSuccess() {
-		return success;
-	}
-	public void setSuccess(boolean success) {
-		this.success = success;
-	}
-	public String getException() {
-		return exception;
-	}
-
-    public void setException(Exception exceptionObj){
+    public boolean isSuccess() {
+        return success;
+    }
+
+    public void setSuccess(boolean success) {
+        this.success = success;
+    }
+
+    public String getException() {
+        return exception;
+    }
+
+    public void setException(Exception exceptionObj) {
         this.exception = EagleExceptionWrapper.wrap(exceptionObj);
     }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java
index 836295b..8ccb43a 100644
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java
@@ -30,57 +30,61 @@ import java.io.IOException;
 import java.util.*;
 
 /**
- * @since 3/18/15
+ * @since 3/18/15.
  */
-public class GenericServiceAPIResponseEntityDeserializer extends JsonDeserializer<GenericServiceAPIResponseEntity> {
-    private final static String META_FIELD="meta";
-    private final static String SUCCESS_FIELD="success";
-    private final static String EXCEPTION_FIELD="exception";
-    private final static String OBJ_FIELD="obj";
-    private final static String TYPE_FIELD="type";
+public class GenericServiceAPIResponseEntityDeserializer
+    extends JsonDeserializer<GenericServiceAPIResponseEntity> {
+    private static final String META_FIELD = "meta";
+    private static final String SUCCESS_FIELD = "success";
+    private static final String EXCEPTION_FIELD = "exception";
+    private static final String OBJ_FIELD = "obj";
+    private static final String TYPE_FIELD = "type";
 
     @Override
-    public GenericServiceAPIResponseEntity deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JsonProcessingException {
+    public GenericServiceAPIResponseEntity deserialize(JsonParser jp, DeserializationContext ctxt)
+        throws IOException, JsonProcessingException {
         GenericServiceAPIResponseEntity entity = new GenericServiceAPIResponseEntity();
         ObjectCodec objectCodec = jp.getCodec();
 
         JsonNode rootNode = jp.getCodec().readTree(jp);
-        if(rootNode.isObject()){
-            Iterator<Map.Entry<String,JsonNode>> fields = rootNode.fields();
+        if (rootNode.isObject()) {
+            Iterator<Map.Entry<String, JsonNode>> fields = rootNode.fields();
             JsonNode objNode = null;
-            while(fields.hasNext()){
-                Map.Entry<String,JsonNode> field = fields.next();
-                if (META_FIELD.equals(field.getKey()) && field.getValue() != null)
+            while (fields.hasNext()) {
+                Map.Entry<String, JsonNode> field = fields.next();
+                if (META_FIELD.equals(field.getKey()) && field.getValue() != null) {
                     entity.setMeta(objectCodec.readValue(field.getValue().traverse(), Map.class));
-                else if(SUCCESS_FIELD.equals(field.getKey()) && field.getValue() != null){
+                } else if (SUCCESS_FIELD.equals(field.getKey()) && field.getValue() != null) {
                     entity.setSuccess(field.getValue().booleanValue());
-                }else if(EXCEPTION_FIELD.equals(field.getKey()) && field.getValue() != null){
+                } else if (EXCEPTION_FIELD.equals(field.getKey()) && field.getValue() != null) {
                     entity.setException(new Exception(field.getValue().textValue()));
-                }else if(TYPE_FIELD.endsWith(field.getKey())  && field.getValue() != null){
-                    Preconditions.checkNotNull(field.getValue().textValue(),"Response type class is null");
+                } else if (TYPE_FIELD.endsWith(field.getKey()) && field.getValue() != null) {
+                    Preconditions.checkNotNull(field.getValue().textValue(), "Response type class is null");
                     try {
                         entity.setType(Class.forName(field.getValue().textValue()));
                     } catch (ClassNotFoundException e) {
                         throw new IOException(e);
                     }
-                }else if(OBJ_FIELD.equals(field.getKey()) && field.getValue() != null){
+                } else if (OBJ_FIELD.equals(field.getKey()) && field.getValue() != null) {
                     objNode = field.getValue();
                 }
             }
 
-            if(objNode!=null) {
-                JavaType collectionType=null;
+            if (objNode != null) {
+                JavaType collectionType = null;
                 if (entity.getType() != null) {
-                    collectionType = TypeFactory.defaultInstance().constructCollectionType(LinkedList.class, entity.getType());
-                }else{
-                    collectionType = TypeFactory.defaultInstance().constructCollectionType(LinkedList.class, Map.class);
+                    collectionType = TypeFactory.defaultInstance().constructCollectionType(LinkedList.class,
+                                                                                           entity.getType());
+                } else {
+                    collectionType = TypeFactory.defaultInstance().constructCollectionType(LinkedList.class,
+                                                                                           Map.class);
                 }
                 List obj = objectCodec.readValue(objNode.traverse(), collectionType);
                 entity.setObj(obj);
             }
-        }else{
+        } else {
             throw new IOException("root node is not object");
         }
         return entity;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseInternalLogHelper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseInternalLogHelper.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseInternalLogHelper.java
index 7a38033..32f382b 100755
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseInternalLogHelper.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseInternalLogHelper.java
@@ -30,216 +30,232 @@ import org.slf4j.LoggerFactory;
 import java.util.*;
 
 public class HBaseInternalLogHelper {
-	private final static Logger LOG  = LoggerFactory.getLogger(HBaseInternalLogHelper.class);
-
-	private static final EntitySerDeserializer ENTITY_SERDESER = new EntitySerDeserializer();
-
-	/**
-	 *
-	 * @param ed
-	 * @param r
-	 * @param qualifiers if null, return all qualifiers defined in ed
-	 * @return
-	 */
-	public static InternalLog parse(EntityDefinition ed, Result r, byte[][] qualifiers) {
-		final byte[] row = r.getRow();
-		// skip the first 4 bytes : prefix
-		final int offset = (ed.getPartitions() == null) ? (4) : (4 + ed.getPartitions().length * 4);
-		long timestamp = ByteUtil.bytesToLong(row, offset);
-		// reverse timestamp
-		timestamp = Long.MAX_VALUE - timestamp;
-		final byte[] family = ed.getColumnFamily().getBytes();
-		final Map<String, byte[]> allQualifierValues = new HashMap<String, byte[]>();
-
-		if (qualifiers != null) {
-			int count = qualifiers.length;
-			final byte[][] values = new byte[count][];
-			for (int i = 0; i < count; i++) {
-				// TODO if returned value is null, it means no this column for this row, so why set null to the object?
-				values[i] = r.getValue(family, qualifiers[i]);
-				allQualifierValues.put(new String(qualifiers[i]), values[i]);
-			}
-		}else{
-			// return all qualifiers
-			for(KeyValue kv:r.list()){
-				byte[] qualifier = kv.getQualifier();
-				byte[] value = kv.getValue();
-				allQualifierValues.put(new String(qualifier),value);
-			}
-		}
-		final InternalLog log = buildObject(ed, row, timestamp, allQualifierValues);
-		return log;
-	}
-
-	/**
-	 *
-	 * @param ed
-	 * @param row
-	 * @param timestamp
-	 * @param allQualifierValues <code>Map &lt; Qualifier name (not display name),Value in bytes array &gt;</code>
-	 * @return
-	 */
-	public static InternalLog buildObject(EntityDefinition ed, byte[] row, long timestamp, Map<String, byte[]> allQualifierValues) {
-		InternalLog log = new InternalLog();
-		String myRow = EagleBase64Wrapper.encodeByteArray2URLSafeString(row);
-		log.setEncodedRowkey(myRow);
-		log.setPrefix(ed.getPrefix());
-		log.setTimestamp(timestamp);
-
-		Map<String, byte[]> logQualifierValues = new HashMap<String, byte[]>();
-		Map<String, String> logTags = new HashMap<String, String>();
-		Map<String, Object> extra = null;
-
-		Map<String,Double> doubleMap = null;
-		// handle with metric
-		boolean isMetricEntity = GenericMetricEntity.GENERIC_METRIC_SERVICE.equals(ed.getService());
-		double[] metricValueArray = null;
-
-		for (Map.Entry<String, byte[]> entry : allQualifierValues.entrySet()) {
-			if (ed.isTag(entry.getKey())) {
-				if (entry.getValue() != null) {
-					logTags.put(entry.getKey(), new String(entry.getValue()));
-				}else if (TokenConstant.isExpression(entry.getKey())){
-					if(doubleMap == null) doubleMap = EntityQualifierUtils.bytesMapToDoubleMap(allQualifierValues, ed);
-					// Caculate expression based fields
-					String expression = TokenConstant.parseExpressionContent(entry.getKey());
-					if (extra == null) extra = new HashMap<String, Object>();
-
-					// Evaluation expression as output based on entity
-					// -----------------------------------------------
-					// 1) Firstly, check whether is metric entity and expression requires value and also value is not number (i.e. double[])
-					// 2) Treat all required fields as double, if not number, then set result as NaN
-
-					try {
-						ExpressionParser parser = ExpressionParser.parse(expression);
-						boolean isRequiringValue = parser.getDependentFields().contains(GenericMetricEntity.VALUE_FIELD);
-
-						if(isMetricEntity && isRequiringValue && doubleMap.get(GenericMetricEntity.VALUE_FIELD)!=null
-								&& Double.isNaN(doubleMap.get(GenericMetricEntity.VALUE_FIELD))) // EntityQualifierUtils will convert non-number field into Double.NaN
-						{
-							// if dependent fields require "value"
-							// and value exists but value's type is double[] instead of double
-
-							// handle with metric value array based expression
-							// lazily extract metric value as double array if required
-							if(metricValueArray == null){
-								// if(allQualifierValues.containsKey(GenericMetricEntity.VALUE_FIELD)){
-								Qualifier qualifier = ed.getDisplayNameMap().get(GenericMetricEntity.VALUE_FIELD);
-								EntitySerDeser serDeser = qualifier.getSerDeser();
-								if(serDeser instanceof DoubleArraySerDeser){
-									byte[] value = allQualifierValues.get(qualifier.getQualifierName());
-									if(value !=null ) metricValueArray = (double[]) serDeser.deserialize(value);
-								}
-								// }
-							}
-
-							if(metricValueArray!=null){
-								double[] resultBucket = new double[metricValueArray.length];
-								Map<String, Double> _doubleMap = new HashMap<String,Double>(doubleMap);
-								_doubleMap.remove(entry.getKey());
-								for(int i=0;i< resultBucket.length;i++) {
-									_doubleMap.put(GenericMetricEntity.VALUE_FIELD, metricValueArray[i]);
-									resultBucket[i]=  parser.eval(_doubleMap);
-								}
-								extra.put(expression,resultBucket);
-							}else{
-								LOG.warn("Failed convert metric value into double[] type which is required by expression: "+expression);
-								// if require value in double[] is NaN
-								double value = parser.eval(doubleMap);
-								extra.put(expression, value);
-							}
-						}else {
-							double value = parser.eval(doubleMap);
-							extra.put(expression, value);
-							// LOG.info("DEBUG: "+entry.getKey()+" = "+ value);
-						}
-					} catch (Exception e) {
-						LOG.error("Failed to eval expression "+expression+", exception: "+e.getMessage(),e);
-					}
-				}
-			} else {
-				logQualifierValues.put(entry.getKey(),entry.getValue());
-			}
-		}
-		log.setQualifierValues(logQualifierValues);
-		log.setTags(logTags);
-		log.setExtraValues(extra);
-		return log;
-	}
-	
-	public static TaggedLogAPIEntity buildEntity(InternalLog log, EntityDefinition entityDef) throws Exception {
-		Map<String, byte[]> qualifierValues = log.getQualifierValues();
-		TaggedLogAPIEntity entity = ENTITY_SERDESER.readValue(qualifierValues, entityDef);
-		if (entity.getTags() == null && log.getTags() != null) {
-			entity.setTags(log.getTags());
-		}
-		entity.setExp(log.getExtraValues());
-		entity.setTimestamp(log.getTimestamp());
-		entity.setEncodedRowkey(log.getEncodedRowkey());
-		entity.setPrefix(log.getPrefix());
-		return entity;
-	}
-	
-	public static List<TaggedLogAPIEntity> buildEntities(List<InternalLog> logs, EntityDefinition entityDef) throws Exception {
-		final List<TaggedLogAPIEntity> result = new ArrayList<TaggedLogAPIEntity>(logs.size());
-		for (InternalLog log : logs) {
-			result.add(buildEntity(log, entityDef));
-		}
-		return result;
-	}
-	
-	public static byte[][] getOutputQualifiers(EntityDefinition entityDef, List<String> outputFields) {
-		final byte[][] result = new byte[outputFields.size()][];
-		int index = 0;
-		for(String field : outputFields){
-			// convert displayName to qualifierName
-			Qualifier q = entityDef.getDisplayNameMap().get(field);
-			if(q == null){ // for tag case
-				result[index++] = field.getBytes();
-			}else{ // for qualifier case
-				result[index++] = q.getQualifierName().getBytes();
-			}
-		}
-		return result;
-	}
-
-	public static InternalLog convertToInternalLog(TaggedLogAPIEntity entity, EntityDefinition entityDef) throws Exception {
-		final InternalLog log = new InternalLog();
-		final Map<String, String> inputTags = entity.getTags();
-		final Map<String, String> tags = new TreeMap<String, String>();
-		if(inputTags!=null) {
-			for (Map.Entry<String, String> entry : inputTags.entrySet()) {
-				tags.put(entry.getKey(), entry.getValue());
-			}
-		}
-		log.setTags(tags);
-		if(entityDef.isTimeSeries()){
-			log.setTimestamp(entity.getTimestamp());
-		}else{
-			log.setTimestamp(EntityConstants.FIXED_WRITE_TIMESTAMP); // set timestamp to MAX, then actually stored 0
-		}
-		
-		// For Metric entity, prefix is populated along with entity instead of EntityDefinition
-		if(entity.getPrefix() != null && !entity.getPrefix().isEmpty()){
-			log.setPrefix(entity.getPrefix());
-		}else{
-			log.setPrefix(entityDef.getPrefix());
-		}
-		
-		log.setPartitions(entityDef.getPartitions());
-		EntitySerDeserializer des = new EntitySerDeserializer();
-		log.setQualifierValues(des.writeValue(entity, entityDef));
-		
-		final IndexDefinition[] indexDefs = entityDef.getIndexes();
-		if (indexDefs != null) {
-			final List<byte[]> indexRowkeys = new ArrayList<byte[]>();
-			for (int i = 0; i < indexDefs.length; ++i) {
-				final IndexDefinition indexDef = indexDefs[i];
-				final byte[] indexRowkey = indexDef.generateIndexRowkey(entity);
-				indexRowkeys.add(indexRowkey);
-			}
-			log.setIndexRowkeys(indexRowkeys);
-		}
-		return log;
-	}
+    private static final Logger LOG = LoggerFactory.getLogger(HBaseInternalLogHelper.class);
+
+    private static final EntitySerDeserializer ENTITY_SERDESER = new EntitySerDeserializer();
+
+    /**
+     * @param ed
+     * @param r
+     * @param qualifiers if null, return all qualifiers defined in ed
+     * @return
+     */
+    public static InternalLog parse(EntityDefinition ed, Result r, byte[][] qualifiers) {
+        final byte[] row = r.getRow();
+        // skip the first 4 bytes : prefix
+        final int offset = (ed.getPartitions() == null) ? (4) : (4 + ed.getPartitions().length * 4);
+        long timestamp = ByteUtil.bytesToLong(row, offset);
+        // reverse timestamp
+        timestamp = Long.MAX_VALUE - timestamp;
+        final byte[] family = ed.getColumnFamily().getBytes();
+        final Map<String, byte[]> allQualifierValues = new HashMap<String, byte[]>();
+
+        if (qualifiers != null) {
+            int count = qualifiers.length;
+            final byte[][] values = new byte[count][];
+            for (int i = 0; i < count; i++) {
+                // TODO if returned value is null, it means no this column for this row, so why set null to
+                // the object?
+                values[i] = r.getValue(family, qualifiers[i]);
+                allQualifierValues.put(new String(qualifiers[i]), values[i]);
+            }
+        } else {
+            // return all qualifiers
+            for (KeyValue kv : r.list()) {
+                byte[] qualifier = kv.getQualifier();
+                byte[] value = kv.getValue();
+                allQualifierValues.put(new String(qualifier), value);
+            }
+        }
+        final InternalLog log = buildObject(ed, row, timestamp, allQualifierValues);
+        return log;
+    }
+
+    /**
+     * @param ed
+     * @param row
+     * @param timestamp
+     * @param allQualifierValues
+     *            <code>Map &lt; Qualifier name (not display name),Value in bytes array &gt;</code>
+     * @return
+     */
+    public static InternalLog buildObject(EntityDefinition ed, byte[] row, long timestamp,
+                                          Map<String, byte[]> allQualifierValues) {
+        InternalLog log = new InternalLog();
+        String myRow = EagleBase64Wrapper.encodeByteArray2URLSafeString(row);
+        log.setEncodedRowkey(myRow);
+        log.setPrefix(ed.getPrefix());
+        log.setTimestamp(timestamp);
+
+        Map<String, byte[]> logQualifierValues = new HashMap<String, byte[]>();
+        Map<String, String> logTags = new HashMap<String, String>();
+        Map<String, Object> extra = null;
+
+        Map<String, Double> doubleMap = null;
+        // handle with metric
+        boolean isMetricEntity = GenericMetricEntity.GENERIC_METRIC_SERVICE.equals(ed.getService());
+        double[] metricValueArray = null;
+
+        for (Map.Entry<String, byte[]> entry : allQualifierValues.entrySet()) {
+            if (ed.isTag(entry.getKey())) {
+                if (entry.getValue() != null) {
+                    logTags.put(entry.getKey(), new String(entry.getValue()));
+                } else if (TokenConstant.isExpression(entry.getKey())) {
+                    if (doubleMap == null) {
+                        doubleMap = EntityQualifierUtils.bytesMapToDoubleMap(allQualifierValues, ed);
+                    }
+                    // Caculate expression based fields
+                    String expression = TokenConstant.parseExpressionContent(entry.getKey());
+                    if (extra == null) {
+                        extra = new HashMap<String, Object>();
+                    }
+
+                    // Evaluation expression as output based on entity
+                    // -----------------------------------------------
+                    // 1) Firstly, check whether is metric entity and expression requires value and also value
+                    // is not number (i.e. double[])
+                    // 2) Treat all required fields as double, if not number, then set result as NaN
+
+                    try {
+                        ExpressionParser parser = ExpressionParser.parse(expression);
+                        boolean isRequiringValue = parser.getDependentFields()
+                            .contains(GenericMetricEntity.VALUE_FIELD);
+
+                        if (isMetricEntity && isRequiringValue
+                            && doubleMap.get(GenericMetricEntity.VALUE_FIELD) != null
+                            && Double.isNaN(doubleMap.get(GenericMetricEntity.VALUE_FIELD))) {
+                            // EntityQualifierUtils will convert non-number field into Double.NaN
+                            // if dependent fields require "value"
+                            // and value exists but value's type is double[] instead of double
+
+                            // handle with metric value array based expression
+                            // lazily extract metric value as double array if required
+                            if (metricValueArray == null) {
+                                // if(allQualifierValues.containsKey(GenericMetricEntity.VALUE_FIELD)){
+                                Qualifier qualifier = ed.getDisplayNameMap()
+                                    .get(GenericMetricEntity.VALUE_FIELD);
+                                EntitySerDeser serDeser = qualifier.getSerDeser();
+                                if (serDeser instanceof DoubleArraySerDeser) {
+                                    byte[] value = allQualifierValues.get(qualifier.getQualifierName());
+                                    if (value != null) {
+                                        metricValueArray = (double[])serDeser.deserialize(value);
+                                    }
+                                }
+                                // }
+                            }
+
+                            if (metricValueArray != null) {
+                                double[] resultBucket = new double[metricValueArray.length];
+                                Map<String, Double> _doubleMap = new HashMap<String, Double>(doubleMap);
+                                _doubleMap.remove(entry.getKey());
+                                for (int i = 0; i < resultBucket.length; i++) {
+                                    _doubleMap.put(GenericMetricEntity.VALUE_FIELD, metricValueArray[i]);
+                                    resultBucket[i] = parser.eval(_doubleMap);
+                                }
+                                extra.put(expression, resultBucket);
+                            } else {
+                                LOG.warn("Failed convert metric value into double[] type which is required by expression: "
+                                         + expression);
+                                // if require value in double[] is NaN
+                                double value = parser.eval(doubleMap);
+                                extra.put(expression, value);
+                            }
+                        } else {
+                            double value = parser.eval(doubleMap);
+                            extra.put(expression, value);
+                            // LOG.info("DEBUG: "+entry.getKey()+" = "+ value);
+                        }
+                    } catch (Exception e) {
+                        LOG.error("Failed to eval expression " + expression + ", exception: "
+                                  + e.getMessage(), e);
+                    }
+                }
+            } else {
+                logQualifierValues.put(entry.getKey(), entry.getValue());
+            }
+        }
+        log.setQualifierValues(logQualifierValues);
+        log.setTags(logTags);
+        log.setExtraValues(extra);
+        return log;
+    }
+
+    public static TaggedLogAPIEntity buildEntity(InternalLog log, EntityDefinition entityDef)
+        throws Exception {
+        Map<String, byte[]> qualifierValues = log.getQualifierValues();
+        TaggedLogAPIEntity entity = ENTITY_SERDESER.readValue(qualifierValues, entityDef);
+        if (entity.getTags() == null && log.getTags() != null) {
+            entity.setTags(log.getTags());
+        }
+        entity.setExp(log.getExtraValues());
+        entity.setTimestamp(log.getTimestamp());
+        entity.setEncodedRowkey(log.getEncodedRowkey());
+        entity.setPrefix(log.getPrefix());
+        return entity;
+    }
+
+    public static List<TaggedLogAPIEntity> buildEntities(List<InternalLog> logs, EntityDefinition entityDef)
+        throws Exception {
+        final List<TaggedLogAPIEntity> result = new ArrayList<TaggedLogAPIEntity>(logs.size());
+        for (InternalLog log : logs) {
+            result.add(buildEntity(log, entityDef));
+        }
+        return result;
+    }
+
+    public static byte[][] getOutputQualifiers(EntityDefinition entityDef, List<String> outputFields) {
+        final byte[][] result = new byte[outputFields.size()][];
+        int index = 0;
+        for (String field : outputFields) {
+            // convert displayName to qualifierName
+            Qualifier q = entityDef.getDisplayNameMap().get(field);
+            if (q == null) { // for tag case
+                result[index++] = field.getBytes();
+            } else { // for qualifier case
+                result[index++] = q.getQualifierName().getBytes();
+            }
+        }
+        return result;
+    }
+
+    public static InternalLog convertToInternalLog(TaggedLogAPIEntity entity, EntityDefinition entityDef)
+        throws Exception {
+        final InternalLog log = new InternalLog();
+        final Map<String, String> inputTags = entity.getTags();
+        final Map<String, String> tags = new TreeMap<String, String>();
+        if (inputTags != null) {
+            for (Map.Entry<String, String> entry : inputTags.entrySet()) {
+                tags.put(entry.getKey(), entry.getValue());
+            }
+        }
+        log.setTags(tags);
+        if (entityDef.isTimeSeries()) {
+            log.setTimestamp(entity.getTimestamp());
+        } else {
+            log.setTimestamp(EntityConstants.FIXED_WRITE_TIMESTAMP); // set timestamp to MAX, then actually stored 0
+        }
+
+        // For Metric entity, prefix is populated along with entity instead of EntityDefinition
+        if (entity.getPrefix() != null && !entity.getPrefix().isEmpty()) {
+            log.setPrefix(entity.getPrefix());
+        } else {
+            log.setPrefix(entityDef.getPrefix());
+        }
+
+        log.setPartitions(entityDef.getPartitions());
+        EntitySerDeserializer des = new EntitySerDeserializer();
+        log.setQualifierValues(des.writeValue(entity, entityDef));
+
+        final IndexDefinition[] indexDefs = entityDef.getIndexes();
+        if (indexDefs != null) {
+            final List<byte[]> indexRowkeys = new ArrayList<byte[]>();
+            for (int i = 0; i < indexDefs.length; ++i) {
+                final IndexDefinition indexDef = indexDefs[i];
+                final byte[] indexRowkey = indexDef.generateIndexRowkey(entity);
+                indexRowkeys.add(indexRowkey);
+            }
+            log.setIndexRowkeys(indexRowkeys);
+        }
+        return log;
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogReader2.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogReader2.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogReader2.java
index c8b9a33..d5c8e2c 100755
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogReader2.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogReader2.java
@@ -28,59 +28,62 @@ import java.util.Date;
 import java.util.List;
 
 public class HBaseLogReader2 extends AbstractHBaseLogReader<InternalLog> {
-	protected ResultScanner rs;
+    protected ResultScanner rs;
 
-	public HBaseLogReader2(EntityDefinition ed, List<String> partitions, Date startTime, Date endTime, Filter filter, String lastScanKey, byte[][] outputQualifiers) {
-		super(ed, partitions, startTime, endTime, filter, lastScanKey, outputQualifiers);
-	}
+    public HBaseLogReader2(EntityDefinition ed, List<String> partitions, Date startTime, Date endTime,
+                           Filter filter, String lastScanKey, byte[][] outputQualifiers) {
+        super(ed, partitions, startTime, endTime, filter, lastScanKey, outputQualifiers);
+    }
 
-	/**
-	 * This constructor supports partition.
-	 *
-	 * @param ed               entity definition
-	 * @param partitions       partition values, which is sorted in partition definition order. TODO: in future we need to support
-	 *                         multiple values for one partition field
-	 * @param startTime        start time of the query
-	 * @param endTime          end time of the query
-	 * @param filter           filter for the hbase scan
-	 * @param lastScanKey      the key of last scan
-	 * @param outputQualifiers the bytes of output qualifier names
-	 * @param prefix           can be populated from outside world specifically for generic metric reader
-	 */
-	public HBaseLogReader2(EntityDefinition ed, List<String> partitions, Date startTime, Date endTime, Filter filter, String lastScanKey, byte[][] outputQualifiers, String prefix) {
-		super(ed, partitions, startTime, endTime, filter, lastScanKey, outputQualifiers, prefix);
-	}
+    /**
+     * This constructor supports partition.
+     *
+     * @param ed entity definition
+     * @param partitions partition values, which is sorted in partition definition order. TODO: in future we
+     *            need to support multiple values for one partition field
+     * @param startTime start time of the query
+     * @param endTime end time of the query
+     * @param filter filter for the hbase scan
+     * @param lastScanKey the key of last scan
+     * @param outputQualifiers the bytes of output qualifier names
+     * @param prefix can be populated from outside world specifically for generic metric reader
+     */
+    public HBaseLogReader2(EntityDefinition ed, List<String> partitions, Date startTime, Date endTime,
+                           Filter filter, String lastScanKey, byte[][] outputQualifiers, String prefix) {
+        super(ed, partitions, startTime, endTime, filter, lastScanKey, outputQualifiers, prefix);
+    }
 
-	@Override
-	protected void onOpen(HTableInterface tbl, Scan scan) throws IOException {
-		rs = tbl.getScanner(scan);
-	}
+    @Override
+    protected void onOpen(HTableInterface tbl, Scan scan) throws IOException {
+        rs = tbl.getScanner(scan);
+    }
 
-	/**
-	 * <h2>Close:</h2>
-	 * 1. Call super.close(): release current table connection <br></br>
-	 * 2. Close Scanner<br></br>
-	 *
-	 * @throws IOException
-	 */
-	@Override
-	public void close() throws IOException {
-		super.close();
-		if(rs != null){
-			rs.close();
-		}
-	}
+    /**
+     * <h2>Close:</h2> 1. Call super.close(): release current table connection <br>
+     * <br>
+     * 2. Close Scanner<br>
+     * <br>
+     *
+     * @throws IOException
+     */
+    @Override
+    public void close() throws IOException {
+        super.close();
+        if (rs != null) {
+            rs.close();
+        }
+    }
 
-	@Override
-	public InternalLog read() throws IOException {
-		if (rs == null)
-			throw new IllegalArgumentException(
-					"ResultScanner must be initialized before reading");
-		InternalLog t = null;
-		Result r = rs.next();
-		if (r != null) {
-			t = HBaseInternalLogHelper.parse(_ed, r, qualifiers);
-		}
-		return t;
-	}
+    @Override
+    public InternalLog read() throws IOException {
+        if (rs == null) {
+            throw new IllegalArgumentException("ResultScanner must be initialized before reading");
+        }
+        InternalLog t = null;
+        Result r = rs.next();
+        if (r != null) {
+            t = HBaseInternalLogHelper.parse(ed, r, qualifiers);
+        }
+        return t;
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogWriter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogWriter.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogWriter.java
index 059ee7f..1cf23b6 100644
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogWriter.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogWriter.java
@@ -29,124 +29,125 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class HBaseLogWriter implements LogWriter {
-	private static Logger LOG = LoggerFactory.getLogger(HBaseLogWriter.class);
-	private static byte[] EMPTY_INDEX_QUALIFER_VALUE = "".getBytes();
-	
-	private HTableInterface tbl;
-	private String table;
-	private String columnFamily;
-	
-	public HBaseLogWriter(String table, String columnFamily) {
-		// TODO assert for non-null of table and columnFamily
-		this.table = table;
-		this.columnFamily = columnFamily;
-	}
-	
-	@Override
-	public void open() throws IOException {
-		try{
-			tbl = EagleConfigFactory.load().getHTable(this.table);
-//			LOGGER.info("HBase table " + table + " audo reflush is " + (tbl.isAutoFlush() ? "enabled" : "disabled"));
-		}catch(Exception ex){
-			LOG.error("Cannot create htable", ex);
-			throw new IOException(ex);
-		}
-	}
-
-	@Override
-	public void close() throws IOException {
-		if(tbl != null){
-			new HTableFactory().releaseHTableInterface(tbl);
-		}
-	}
-
-	@Override
-	public void flush() throws IOException {
-		tbl.flushCommits();
-	}
-	
-	protected void populateColumnValues(Put p, InternalLog log){
-		Map<String, byte[]> qualifierValues = log.getQualifierValues();
-		// iterate all qualifierValues
-		for(Map.Entry<String, byte[]> entry : qualifierValues.entrySet()){
-			p.add(columnFamily.getBytes(), entry.getKey().getBytes(), entry.getValue());
-		}
-		
-		Map<String, String> tags = log.getTags();
-		// iterate all tags, each tag will be stored as a column qualifier
-		if(tags != null){
-			for(Map.Entry<String, String> entry : tags.entrySet()){
-				// TODO need a consistent handling of null values
-				if(entry.getValue() != null)
-					p.add(columnFamily.getBytes(), entry.getKey().getBytes(), entry.getValue().getBytes());
-			}
-		}
-	}
-
-	/**
-	 * TODO need think about if multi-PUT is necessary, by checking if autoFlush works
-	 */
-	@Override
-	public byte[] write(InternalLog log) throws IOException{
-		final byte[] rowkey = RowkeyBuilder.buildRowkey(log);
-		final Put p = new Put(rowkey);
-		populateColumnValues(p, log);
-		tbl.put(p);
-		final List<byte[]> indexRowkeys = log.getIndexRowkeys();
-		if (indexRowkeys != null) {
-			writeIndexes(rowkey, indexRowkeys);
-		}
-		return rowkey;
-	}
-
-	/**
-	 * TODO need think about if multi-PUT is necessary, by checking if autoFlush works
-	 */
-	public List<byte[]> write(List<InternalLog> logs) throws IOException{
-		final List<Put> puts = new ArrayList<Put>(logs.size());
-		final List<byte[]> result = new ArrayList<byte[]>(logs.size());
-		for (InternalLog log : logs) {
-			final byte[] rowkey = RowkeyBuilder.buildRowkey(log);
-			final Put p = new Put(rowkey);
-			populateColumnValues(p, log);
-			puts.add(p);
-			final List<byte[]> indexRowkeys = log.getIndexRowkeys();
-			if (indexRowkeys != null) {
-				writeIndexes(rowkey, indexRowkeys, puts);
-			}
-			result.add(rowkey);
-		}
-		tbl.put(puts);
-		return result;
-	}
-	
-	@Override
-	public void updateByRowkey(byte[] rowkey, InternalLog log) throws IOException{
-		Put p = new Put(rowkey);
-		populateColumnValues(p, log);
-		tbl.put(p);
-		final List<byte[]> indexRowkeys = log.getIndexRowkeys();
-		if (indexRowkeys != null) {
-			writeIndexes(rowkey, indexRowkeys);
-		}
-	}
-
-	private void writeIndexes(byte[] rowkey, List<byte[]> indexRowkeys) throws IOException {
-		for (byte[] indexRowkey : indexRowkeys) {
-			Put p = new Put(indexRowkey);
-			p.add(columnFamily.getBytes(), rowkey, EMPTY_INDEX_QUALIFER_VALUE);
-			tbl.put(p);
-		}
-	}
-
-	private void writeIndexes(byte[] rowkey, List<byte[]> indexRowkeys, List<Put> puts) throws IOException {
-		for (byte[] indexRowkey : indexRowkeys) {
-			Put p = new Put(indexRowkey);
-			p.add(columnFamily.getBytes(), rowkey, EMPTY_INDEX_QUALIFER_VALUE);
-			puts.add(p);
-//			tbl.put(p);
-		}
-	}
-
-	
+    private static Logger LOG = LoggerFactory.getLogger(HBaseLogWriter.class);
+    private static byte[] EMPTY_INDEX_QUALIFER_VALUE = "".getBytes();
+
+    private HTableInterface tbl;
+    private String table;
+    private String columnFamily;
+
+    public HBaseLogWriter(String table, String columnFamily) {
+        // TODO assert for non-null of table and columnFamily
+        this.table = table;
+        this.columnFamily = columnFamily;
+    }
+
+    @Override
+    public void open() throws IOException {
+        try {
+            tbl = EagleConfigFactory.load().getHTable(this.table);
+            // LOGGER.info("HBase table " + table + " audo reflush is " + (tbl.isAutoFlush() ? "enabled" :
+            // "disabled"));
+        } catch (Exception ex) {
+            LOG.error("Cannot create htable", ex);
+            throw new IOException(ex);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (tbl != null) {
+            new HTableFactory().releaseHTableInterface(tbl);
+        }
+    }
+
+    @Override
+    public void flush() throws IOException {
+        tbl.flushCommits();
+    }
+
+    protected void populateColumnValues(Put p, InternalLog log) {
+        Map<String, byte[]> qualifierValues = log.getQualifierValues();
+        // iterate all qualifierValues
+        for (Map.Entry<String, byte[]> entry : qualifierValues.entrySet()) {
+            p.add(columnFamily.getBytes(), entry.getKey().getBytes(), entry.getValue());
+        }
+
+        Map<String, String> tags = log.getTags();
+        // iterate all tags, each tag will be stored as a column qualifier
+        if (tags != null) {
+            for (Map.Entry<String, String> entry : tags.entrySet()) {
+                // TODO need a consistent handling of null values
+                if (entry.getValue() != null) {
+                    p.add(columnFamily.getBytes(), entry.getKey().getBytes(), entry.getValue().getBytes());
+                }
+            }
+        }
+    }
+
+    /**
+     * TODO need think about if multi-PUT is necessary, by checking if autoFlush works
+     */
+    @Override
+    public byte[] write(InternalLog log) throws IOException {
+        final byte[] rowkey = RowkeyBuilder.buildRowkey(log);
+        final Put p = new Put(rowkey);
+        populateColumnValues(p, log);
+        tbl.put(p);
+        final List<byte[]> indexRowkeys = log.getIndexRowkeys();
+        if (indexRowkeys != null) {
+            writeIndexes(rowkey, indexRowkeys);
+        }
+        return rowkey;
+    }
+
+    /**
+     * TODO need think about if multi-PUT is necessary, by checking if autoFlush works
+     */
+    public List<byte[]> write(List<InternalLog> logs) throws IOException {
+        final List<Put> puts = new ArrayList<Put>(logs.size());
+        final List<byte[]> result = new ArrayList<byte[]>(logs.size());
+        for (InternalLog log : logs) {
+            final byte[] rowkey = RowkeyBuilder.buildRowkey(log);
+            final Put p = new Put(rowkey);
+            populateColumnValues(p, log);
+            puts.add(p);
+            final List<byte[]> indexRowkeys = log.getIndexRowkeys();
+            if (indexRowkeys != null) {
+                writeIndexes(rowkey, indexRowkeys, puts);
+            }
+            result.add(rowkey);
+        }
+        tbl.put(puts);
+        return result;
+    }
+
+    @Override
+    public void updateByRowkey(byte[] rowkey, InternalLog log) throws IOException {
+        Put p = new Put(rowkey);
+        populateColumnValues(p, log);
+        tbl.put(p);
+        final List<byte[]> indexRowkeys = log.getIndexRowkeys();
+        if (indexRowkeys != null) {
+            writeIndexes(rowkey, indexRowkeys);
+        }
+    }
+
+    private void writeIndexes(byte[] rowkey, List<byte[]> indexRowkeys) throws IOException {
+        for (byte[] indexRowkey : indexRowkeys) {
+            Put p = new Put(indexRowkey);
+            p.add(columnFamily.getBytes(), rowkey, EMPTY_INDEX_QUALIFER_VALUE);
+            tbl.put(p);
+        }
+    }
+
+    private void writeIndexes(byte[] rowkey, List<byte[]> indexRowkeys, List<Put> puts) throws IOException {
+        for (byte[] indexRowkey : indexRowkeys) {
+            Put p = new Put(indexRowkey);
+            p.add(columnFamily.getBytes(), rowkey, EMPTY_INDEX_QUALIFER_VALUE);
+            puts.add(p);
+            // tbl.put(p);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/InternalLog.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/InternalLog.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/InternalLog.java
index 8276640..066401f 100755
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/InternalLog.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/InternalLog.java
@@ -25,115 +25,134 @@ import java.util.Map;
  * TODO we should decouple BaseLog during write time and BaseLog during read time
  */
 public class InternalLog {
-	private String encodedRowkey;
-	private String prefix;
-	private String[] partitions;
-	private long timestamp;
-	private Map<String, byte[]> qualifierValues;
-
-	private Map<String,Object> extraValues;
-	private Map<String, String> tags;
-	private Map<String, List<String>> searchTags;
-	private List<byte[]> indexRowkeys;
-
-	public String getEncodedRowkey() {
-		return encodedRowkey;
-	}
-
-	public void setEncodedRowkey(String encodedRowkey) {
-		this.encodedRowkey = encodedRowkey;
-	}
-	
-	public Map<String, byte[]> getQualifierValues() {
-		return qualifierValues;
-	}
-	public void setQualifierValues(Map<String, byte[]> qualifierValues) {
-		this.qualifierValues = qualifierValues;
-	}
-
-	public Map<String, List<String>> getSearchTags() {
-		return searchTags;
-	}
-	public void setSearchTags(Map<String, List<String>> searchTags) {
-		this.searchTags = searchTags;
-	}
-	public String getPrefix() {
-		return prefix;
-	}
-	public void setPrefix(String prefix) {
-		this.prefix = prefix;
-	}
-	public String[] getPartitions() {
-		return partitions;
-	}
-	public void setPartitions(String[] partitions) {
-		this.partitions = partitions;
-	}
-	public long getTimestamp() {
-		return timestamp;
-	}
-	public void setTimestamp(long timestamp) {
-		this.timestamp = timestamp;
-	}
-	public Map<String, String> getTags() {
-		return tags;
-	}
-	public void setTags(Map<String, String> tags) {
-		this.tags = tags;
-	}
-	public List<byte[]> getIndexRowkeys() {
-		return indexRowkeys;
-	}
-	public void setIndexRowkeys(List<byte[]> indexRowkeys) {
-		this.indexRowkeys = indexRowkeys;
-	}
-	public Map<String, Object> getExtraValues() { return extraValues; }
-	public void setExtraValues(Map<String, Object> extraValues) { this.extraValues = extraValues; }
-
-	public String toString(){
-		StringBuffer sb = new StringBuffer();
-		sb.append(prefix);
-		sb.append("|");
-		sb.append(DateTimeUtil.millisecondsToHumanDateWithMilliseconds(timestamp));
-		sb.append("(");
-		sb.append(timestamp);
-		sb.append(")");
-		sb.append("|searchTags:");
-		if(searchTags != null){
-			for(String tagkey : searchTags.keySet()){
-				sb.append(tagkey);
-				sb.append('=');
-				List<String> tagValues = searchTags.get(tagkey);
-				sb.append("(");
-				for(String tagValue : tagValues){
-					sb.append(tagValue);
-					sb.append(",");
-				}
-				sb.append(")");
-				sb.append(",");
-			}
-		}
-		sb.append("|tags:");
-		if(tags != null){
-			for(Map.Entry<String, String> entry : tags.entrySet()){
-				sb.append(entry.getKey());
-				sb.append("=");
-				sb.append(entry.getValue());
-				sb.append(",");
-			}
-		}
-		sb.append("|columns:");
-		if(qualifierValues != null){
-			for(String qualifier : qualifierValues.keySet()){
-				byte[] value = qualifierValues.get(qualifier);
-				sb.append(qualifier);
-				sb.append("=");
-				if(value != null){
-					sb.append(new String(value));
-				}
-				sb.append(",");
-			}
-		}
-		return sb.toString();
-	}
+    private String encodedRowkey;
+    private String prefix;
+    private String[] partitions;
+    private long timestamp;
+    private Map<String, byte[]> qualifierValues;
+
+    private Map<String, Object> extraValues;
+    private Map<String, String> tags;
+    private Map<String, List<String>> searchTags;
+    private List<byte[]> indexRowkeys;
+
+    public String getEncodedRowkey() {
+        return encodedRowkey;
+    }
+
+    public void setEncodedRowkey(String encodedRowkey) {
+        this.encodedRowkey = encodedRowkey;
+    }
+
+    public Map<String, byte[]> getQualifierValues() {
+        return qualifierValues;
+    }
+
+    public void setQualifierValues(Map<String, byte[]> qualifierValues) {
+        this.qualifierValues = qualifierValues;
+    }
+
+    public Map<String, List<String>> getSearchTags() {
+        return searchTags;
+    }
+
+    public void setSearchTags(Map<String, List<String>> searchTags) {
+        this.searchTags = searchTags;
+    }
+
+    public String getPrefix() {
+        return prefix;
+    }
+
+    public void setPrefix(String prefix) {
+        this.prefix = prefix;
+    }
+
+    public String[] getPartitions() {
+        return partitions;
+    }
+
+    public void setPartitions(String[] partitions) {
+        this.partitions = partitions;
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    public Map<String, String> getTags() {
+        return tags;
+    }
+
+    public void setTags(Map<String, String> tags) {
+        this.tags = tags;
+    }
+
+    public List<byte[]> getIndexRowkeys() {
+        return indexRowkeys;
+    }
+
+    public void setIndexRowkeys(List<byte[]> indexRowkeys) {
+        this.indexRowkeys = indexRowkeys;
+    }
+
+    public Map<String, Object> getExtraValues() {
+        return extraValues;
+    }
+
+    public void setExtraValues(Map<String, Object> extraValues) {
+        this.extraValues = extraValues;
+    }
+
+    @Override
+    public String toString() {
+        StringBuffer sb = new StringBuffer();
+        sb.append(prefix);
+        sb.append("|");
+        sb.append(DateTimeUtil.millisecondsToHumanDateWithMilliseconds(timestamp));
+        sb.append("(");
+        sb.append(timestamp);
+        sb.append(")");
+        sb.append("|searchTags:");
+        if (searchTags != null) {
+            for (String tagkey : searchTags.keySet()) {
+                sb.append(tagkey);
+                sb.append('=');
+                List<String> tagValues = searchTags.get(tagkey);
+                sb.append("(");
+                for (String tagValue : tagValues) {
+                    sb.append(tagValue);
+                    sb.append(",");
+                }
+                sb.append(")");
+                sb.append(",");
+            }
+        }
+        sb.append("|tags:");
+        if (tags != null) {
+            for (Map.Entry<String, String> entry : tags.entrySet()) {
+                sb.append(entry.getKey());
+                sb.append("=");
+                sb.append(entry.getValue());
+                sb.append(",");
+            }
+        }
+        sb.append("|columns:");
+        if (qualifierValues != null) {
+            for (String qualifier : qualifierValues.keySet()) {
+                byte[] value = qualifierValues.get(qualifier);
+                sb.append(qualifier);
+                sb.append("=");
+                if (value != null) {
+                    sb.append(new String(value));
+                }
+                sb.append(",");
+            }
+        }
+        return sb.toString();
+    }
 }