You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/09/07 12:30:54 UTC

[5/8] incubator-eagle git commit: [EAGLE-520] Fix and decouple co-processor from eagle aggreation query service

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateProtocolEndPoint.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateProtocolEndPoint.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateProtocolEndPoint.java
index 53d27de..7ef8b80 100755
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateProtocolEndPoint.java
+++ b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateProtocolEndPoint.java
@@ -16,6 +16,7 @@
  */
 package org.apache.eagle.storage.hbase.query.coprocessor;
 
+import org.apache.eagle.common.DateTimeUtil;
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 import org.apache.eagle.log.entity.*;
 import org.apache.eagle.log.entity.meta.EntityDefinition;
@@ -25,7 +26,7 @@ import org.apache.eagle.query.aggregate.raw.GroupbyKeyValue;
 import org.apache.eagle.query.aggregate.raw.RawAggregator;
 import org.apache.eagle.query.aggregate.timeseries.TimeSeriesAggregator;
 import org.apache.eagle.storage.hbase.query.coprocessor.generated.AggregateProtos;
-import org.apache.eagle.common.DateTimeUtil;
+
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.Service;
@@ -49,8 +50,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-//public abstract class AbstractAggregateEndPoint extends BaseEndpointCoprocessor{
 public class AggregateProtocolEndPoint extends AggregateProtos.AggregateProtocol implements AggregateProtocol, Coprocessor, CoprocessorService {
+    private static final Logger LOG = LoggerFactory.getLogger(AggregateProtocolEndPoint.class);
 
     private RegionCoprocessorEnvironment env;
 
@@ -59,16 +60,19 @@ public class AggregateProtocolEndPoint extends AggregateProtos.AggregateProtocol
         return this;
     }
 
-    public AggregateProtocolEndPoint() {}
+    public AggregateProtocolEndPoint() {
+    }
 
-    protected void checkNotNull(Object obj,String name) {
-		if(obj==null) throw new NullPointerException(name+" is null");
-	}
+    protected void checkNotNull(Object obj, String name) {
+        if (obj == null) {
+            throw new NullPointerException(name + " is null");
+        }
+    }
 
     @Override
     public void start(CoprocessorEnvironment env) throws IOException {
         if (env instanceof RegionCoprocessorEnvironment) {
-            this.env = (RegionCoprocessorEnvironment)env;
+            this.env = (RegionCoprocessorEnvironment) env;
         } else {
             throw new CoprocessorException("Must be loaded on a table region!");
         }
@@ -79,302 +83,296 @@ public class AggregateProtocolEndPoint extends AggregateProtos.AggregateProtocol
         // do nothing
     }
 
-//    @Override
-//	public ProtocolSignature getProtocolSignature(String protocol, long version, int clientMethodsHashCode) throws IOException {
-//		if (AggregateProtocol.class.getName().equals(protocol)) {
-////			return new ProtocolSignature(AggregateProtocol.VERSION, null);
-//			return new ProtocolSignature(98l, null);
-//		}
-//		throw new IOException("Unknown protocol: " + protocol);
-//	}
-
-	protected HRegion getCurrentRegion(){
-		return this.env.getRegion();
-	}
-
-	/**
-	 * <pre>
-	 * Region-unittest,\x82\xB4\x85\xC2\x7F\xFF\xFE\xB6\xC9jNG\xEE!\x5C3\xBB\xAE\xA1:\x05\xA5\xA9x\xB0\xA1"8\x05\xFB(\xD2VY\xDB\x9A\x06\x09\xA9\x98\xC2\xE3\x8D=,1413960230654.aaf2a6c9f2c87c196f43497243bb2424.
-	 * RegionID-unittest,1413960230654
-	 * </pre>
-	 */
-	protected String getLogHeader(){
-		HRegion region = this.getCurrentRegion();
-		return LOG.isDebugEnabled() ? String.format("Region-%s",region.getRegionNameAsString()):
-				String.format("Region-%s,%d",region.getTableDesc().getNameAsString(),region.getRegionId());
-	}
-
-	protected class InternalReadReport {
-		public InternalReadReport(long counter,long startTimestamp,long stopTimestamp){
-			this.counter = counter;
-			this.startTimestamp = startTimestamp;
-			this.stopTimestamp = stopTimestamp;
-		}
-		public long getCounter() {
-			return counter;
-		}
-		public void setCounter(long counter) {
-			this.counter = counter;
-		}
-
-		public long getStartTimestamp() {
-			return startTimestamp;
-		}
-
-		public void setStartTimestamp(long startTimestamp) {
-			this.startTimestamp = startTimestamp;
-		}
-
-		public long getStopTimestamp() {
-			return stopTimestamp;
-		}
-
-		public void setStopTimestamp(long stopTimestamp) {
-			this.stopTimestamp = stopTimestamp;
-		}
-
-		private long counter;
-		private long startTimestamp;
-		private long stopTimestamp;
-	}
-
-	/**
-	 * Asynchronous HBase scan read as entity
-	 *
-	 * @param scan
-	 * @throws java.io.IOException
-	 */
-	protected InternalReadReport asyncStreamRead(EntityDefinition ed, Scan scan, EntityCreationListener listener) throws IOException {
-//		_init();
-		long counter = 0;
-		long startTimestamp = 0;
-		long stopTimestamp = 0;
-
-		InternalScanner scanner = this.getCurrentRegion().getScanner(scan);
-		List<Cell> results = new ArrayList<Cell>();
-		try{
-			boolean hasMoreRows;
-			GenericMetricShadowEntity singleMetricEntity = null;
-			do{
-				hasMoreRows = scanner.next(results);
-				Map<String, byte[]> kvMap = new HashMap<String, byte[]>();
-				if(!results.isEmpty()){
-					counter ++;
-					byte[] row = results.get(0).getRow();
-					long timestamp = RowkeyBuilder.getTimestamp(row, ed);
-					
-					// Min
-					if(startTimestamp == 0 || startTimestamp > timestamp ){
-						startTimestamp = timestamp;
-					}
-					
-					// Max
-					if(stopTimestamp == 0 || stopTimestamp < timestamp ){
-						stopTimestamp = timestamp;
-					}
-					
-					for(Cell kv:results){
-						String qualifierName = Bytes.toString(kv.getQualifier());
-//						Qualifier qualifier = null;
-//						if(!ed.isTag(qualifierName)){
-//							qualifier = ed.getQualifierNameMap().get(qualifierName);
-//							if(qualifier == null){
-//								LOG.error("qualifier for   " + qualifierName + " not exist");
-//								throw new NullPointerException("qualifier for field "+qualifierName+" not exist");
-//							}
-//						}
-						if(kv.getValue()!=null) kvMap.put(qualifierName ,kv.getValue());
-					}
-					
-					// LOG.info("DEBUG: timestamp="+timestamp+", keys=["+StringUtils.join(kvMap.keySet(),",")+"]");
-					
-					InternalLog internalLog = HBaseInternalLogHelper.buildObject(ed, row, timestamp, kvMap);
-					if(internalLog!=null){
-						TaggedLogAPIEntity logAPIEntity = null;
-						try {
-							logAPIEntity = HBaseInternalLogHelper.buildEntity(internalLog, ed);
-							if(logAPIEntity instanceof GenericMetricEntity){
-								if(singleMetricEntity == null) singleMetricEntity = new GenericMetricShadowEntity();
-								GenericMetricEntity e = (GenericMetricEntity)logAPIEntity;
-								if(e.getValue()!=null) {
-									int count = e.getValue().length;
-									@SuppressWarnings("unused")
-									Class<?> cls = ed.getMetricDefinition().getSingleTimestampEntityClass();
-									for (int i = 0; i < count; i++) {
-										long ts = logAPIEntity.getTimestamp() + i * ed.getMetricDefinition().getInterval();
-										// exclude those entity which is not within the time range in search condition. [start, end)
-										singleMetricEntity.setTimestamp(ts);
-										singleMetricEntity.setTags(e.getTags());
-										singleMetricEntity.setValue(e.getValue()[i]);
-										// Min
-										if (startTimestamp == 0 || startTimestamp > ts) startTimestamp = ts;
-										// Max
-										if (stopTimestamp == 0 || stopTimestamp < ts) stopTimestamp = ts;
-										listener.entityCreated(singleMetricEntity);
-									}
-								}
-							}else {
-								// LOG.info("DEBUG: rowKey="+logAPIEntity.getEncodedRowkey());
-								listener.entityCreated(logAPIEntity);
-							}
-						} catch (Exception e) {
-							if(internalLog!=null) {
-								LOG.error("Got exception to handle " + internalLog.toString() + ": " + e.getMessage(), e);
-							}
-							throw new IOException(e);
-						}
-					}else{
-						LOG.error("Got null to parse internal log for row: " + row.length + " with fields: " + kvMap);
-					}
-					results.clear();
-				}else{
-					if(LOG.isDebugEnabled()) LOG.warn("Empty batch of KeyValue");
-				}
-			} while(hasMoreRows);
-		}catch(IOException ex){
-			LOG.error(ex.getMessage(),ex);
-			throw ex;
-		} finally {
-            if(scanner != null) {
+    protected HRegion getCurrentRegion() {
+        return this.env.getRegion();
+    }
+
+    /**
+     * <pre>
+     * Region-unittest
+     *  ,\x82\xB4\x85\xC2\x7F\xFF\xFE\xB6\xC9jNG\xEE!\x5C3\xBB\xAE\xA1
+     *  :\x05\xA5\xA9x\xB0\xA1"8\x05\xFB(\xD2VY\xDB\x9A\x06\x09\xA9\x98\xC2\xE3\x8D=,1413960230654.aaf2a6c9f2c87c196f43497243bb2424.
+     *
+     * RegionID-unittest,1413960230654
+     * </pre>
+     */
+    protected String getLogHeader() {
+        HRegion region = this.getCurrentRegion();
+        return LOG.isDebugEnabled() ? String.format("Region-%s", region.getRegionNameAsString()) :
+            String.format("Region-%s,%d", region.getTableDesc().getNameAsString(), region.getRegionId());
+    }
+
+    protected class InternalReadReport {
+        public InternalReadReport(long counter, long startTimestamp, long stopTimestamp) {
+            this.counter = counter;
+            this.startTimestamp = startTimestamp;
+            this.stopTimestamp = stopTimestamp;
+        }
+
+        public long getCounter() {
+            return counter;
+        }
+
+        public void setCounter(long counter) {
+            this.counter = counter;
+        }
+
+        public long getStartTimestamp() {
+            return startTimestamp;
+        }
+
+        public void setStartTimestamp(long startTimestamp) {
+            this.startTimestamp = startTimestamp;
+        }
+
+        public long getStopTimestamp() {
+            return stopTimestamp;
+        }
+
+        public void setStopTimestamp(long stopTimestamp) {
+            this.stopTimestamp = stopTimestamp;
+        }
+
+        private long counter;
+        private long startTimestamp;
+        private long stopTimestamp;
+    }
+
+    /**
+     * Asynchronous HBase scan read as entity.
+     */
+    protected InternalReadReport asyncStreamRead(EntityDefinition ed, Scan scan, EntityCreationListener listener) throws IOException {
+        long counter = 0;
+        long startTimestamp = 0;
+        long stopTimestamp = 0;
+
+        InternalScanner scanner = this.getCurrentRegion().getScanner(scan);
+        List<Cell> results = new ArrayList<Cell>();
+        try {
+            boolean hasMoreRows;
+            GenericMetricShadowEntity singleMetricEntity = null;
+            do {
+                hasMoreRows = scanner.next(results);
+                Map<String, byte[]> kvMap = new HashMap<String, byte[]>();
+                if (!results.isEmpty()) {
+                    counter++;
+                    byte[] row = results.get(0).getRow();
+                    long timestamp = RowkeyBuilder.getTimestamp(row, ed);
+
+                    // Min
+                    if (startTimestamp == 0 || startTimestamp > timestamp) {
+                        startTimestamp = timestamp;
+                    }
+
+                    // Max
+                    if (stopTimestamp == 0 || stopTimestamp < timestamp) {
+                        stopTimestamp = timestamp;
+                    }
+
+                    for (Cell kv : results) {
+                        String qualifierName = Bytes.toString(kv.getQualifier());
+                        // Qualifier qualifier = null;
+                        // if(!ed.isTag(qualifierName)){
+                        //  qualifier = ed.getQualifierNameMap().get(qualifierName);
+                        //  if(qualifier == null){
+                        //      LOG.error("qualifier for   " + qualifierName + " not exist");
+                        //      throw new NullPointerException("qualifier for field "+qualifierName+" not exist");
+                        //  }
+                        // }
+                        if (kv.getValue() != null) {
+                            kvMap.put(qualifierName, kv.getValue());
+                        }
+                    }
+
+                    // LOG.info("DEBUG: timestamp="+timestamp+", keys=["+StringUtils.join(kvMap.keySet(),",")+"]");
+
+                    InternalLog internalLog = HBaseInternalLogHelper.buildObject(ed, row, timestamp, kvMap);
+                    if (internalLog != null) {
+                        TaggedLogAPIEntity logAPIEntity = null;
+                        try {
+                            logAPIEntity = HBaseInternalLogHelper.buildEntity(internalLog, ed);
+                            if (logAPIEntity instanceof GenericMetricEntity) {
+                                if (singleMetricEntity == null) {
+                                    singleMetricEntity = new GenericMetricShadowEntity();
+                                }
+                                GenericMetricEntity e = (GenericMetricEntity) logAPIEntity;
+                                if (e.getValue() != null) {
+                                    int count = e.getValue().length;
+                                    @SuppressWarnings("unused")
+                                    Class<?> cls = ed.getMetricDefinition().getSingleTimestampEntityClass();
+                                    for (int i = 0; i < count; i++) {
+                                        long ts = logAPIEntity.getTimestamp() + i * ed.getMetricDefinition().getInterval();
+                                        // exclude those entity which is not within the time range in search condition. [start, end)
+                                        singleMetricEntity.setTimestamp(ts);
+                                        singleMetricEntity.setTags(e.getTags());
+                                        singleMetricEntity.setValue(e.getValue()[i]);
+                                        // Min
+                                        if (startTimestamp == 0 || startTimestamp > ts) {
+                                            startTimestamp = ts;
+                                        }
+                                        // Max
+                                        if (stopTimestamp == 0 || stopTimestamp < ts) {
+                                            stopTimestamp = ts;
+                                        }
+                                        listener.entityCreated(singleMetricEntity);
+                                    }
+                                }
+                            } else {
+                                // LOG.info("DEBUG: rowKey="+logAPIEntity.getEncodedRowkey());
+                                listener.entityCreated(logAPIEntity);
+                            }
+                        } catch (Exception e) {
+                            if (internalLog != null) {
+                                LOG.error("Got exception to handle " + internalLog.toString() + ": " + e.getMessage(), e);
+                            }
+                            throw new IOException(e);
+                        }
+                    } else {
+                        LOG.error("Got null to parse internal log for row: " + row.length + " with fields: " + kvMap);
+                    }
+                    results.clear();
+                } else {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.warn("Empty batch of KeyValue");
+                    }
+                }
+            }
+            while (hasMoreRows);
+        } catch (IOException ex) {
+            LOG.error(ex.getMessage(), ex);
+            throw ex;
+        } finally {
+            if (scanner != null) {
                 scanner.close();
             }
-		}
-		return new InternalReadReport(counter,startTimestamp,stopTimestamp);
-	}
-
-	/**
-	 * Asynchronous HBase scan read as RAW qualifier
-	 *
-	 * @param scan
-	 * @param listener
-	 * @throws Exception
-	 */
-	protected InternalReadReport asyncStreamRead(EntityDefinition ed, Scan scan, QualifierCreationListener listener) throws IOException {
-//		_init();
-		long counter = 0;
-		long startTimestamp = 0;
-		long stopTimestamp = 0;
-		InternalScanner scanner = this.getCurrentRegion().getScanner(scan);
-		List<Cell> results = new ArrayList<Cell>();
-		try{
-			boolean hasMoreRows;//false by default
-			do{
-				hasMoreRows = scanner.next(results);
-				Map<String, byte[]> kvMap = new HashMap<String, byte[]>();
-				if(!results.isEmpty()){
-					counter ++;
-					byte[] row = results.get(0).getRow();
-//					if(ed.isTimeSeries()){
-					long timestamp = RowkeyBuilder.getTimestamp(row,ed);
-					// Min
-					if(startTimestamp == 0 || startTimestamp > timestamp ){
-						startTimestamp = timestamp;
-					}
-					// Max
-					if(stopTimestamp == 0 || stopTimestamp < timestamp ){
-						stopTimestamp = timestamp;
-					}
-//					}
-					
-					for(Cell kv:results){
-						String qualifierName = Bytes.toString(kv.getQualifier());
-						Qualifier qualifier = null;
-						if(!ed.isTag(qualifierName)){
-							qualifier = ed.getQualifierNameMap().get(qualifierName);
-							if(qualifier == null){
-								LOG.error("qualifier for field " + qualifierName + " not exist");
-								throw new IOException(new NullPointerException("qualifier for field "+qualifierName+" is null"));
-							}
-							qualifierName = qualifier.getDisplayName();
-						}
-						if(kv.getValue()!=null) kvMap.put(qualifierName,kv.getValue());
-					}
-					
-//					LOG.info("DEBUG: timestamp="+timestamp+", keys=["+StringUtils.join(kvMap.keySet(),",")+"]");
-
-					if(!kvMap.isEmpty()) listener.qualifierCreated(kvMap);
-					results.clear();
-				}else{
-					if(LOG.isDebugEnabled()) LOG.warn("Empty batch of KeyValue");
-				}
-			} while(hasMoreRows);
-		} catch(IOException ex){
-			LOG.error(ex.getMessage(),ex);
-			throw ex;
-		} finally {
-            if(scanner != null) {
+        }
+        return new InternalReadReport(counter, startTimestamp, stopTimestamp);
+    }
+
+    /**
+     * Asynchronous HBase scan read as RAW qualifier.
+     */
+    protected InternalReadReport asyncStreamRead(EntityDefinition ed, Scan scan, QualifierCreationListener listener) throws IOException {
+        long counter = 0;
+        long startTimestamp = 0;
+        long stopTimestamp = 0;
+        InternalScanner scanner = this.getCurrentRegion().getScanner(scan);
+        List<Cell> results = new ArrayList<Cell>();
+        try {
+            boolean hasMoreRows;//false by default
+            do {
+                hasMoreRows = scanner.next(results);
+                Map<String, byte[]> kvMap = new HashMap<>();
+                if (!results.isEmpty()) {
+                    counter++;
+                    byte[] row = results.get(0).getRow();
+                    long timestamp = RowkeyBuilder.getTimestamp(row, ed);
+                    // Min
+                    if (startTimestamp == 0 || startTimestamp > timestamp) {
+                        startTimestamp = timestamp;
+                    }
+                    // Max
+                    if (stopTimestamp == 0 || stopTimestamp < timestamp) {
+                        stopTimestamp = timestamp;
+                    }
+
+                    for (Cell kv : results) {
+                        String qualifierName = Bytes.toString(kv.getQualifier());
+                        Qualifier qualifier = null;
+                        if (!ed.isTag(qualifierName)) {
+                            qualifier = ed.getQualifierNameMap().get(qualifierName);
+                            if (qualifier == null) {
+                                LOG.error("qualifier for field " + qualifierName + " not exist");
+                                throw new IOException(new NullPointerException("qualifier for field " + qualifierName + " is null"));
+                            }
+                            qualifierName = qualifier.getDisplayName();
+                        }
+                        if (kv.getValue() != null) {
+                            kvMap.put(qualifierName, kv.getValue());
+                        }
+                    }
+
+                    if (!kvMap.isEmpty()) {
+                        listener.qualifierCreated(kvMap);
+                    }
+                    results.clear();
+                } else {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.warn("Empty batch of KeyValue");
+                    }
+                }
+            }
+            while (hasMoreRows);
+        } catch (IOException ex) {
+            LOG.error(ex.getMessage(), ex);
+            throw ex;
+        } finally {
+            if (scanner != null) {
                 scanner.close();
             }
-		}
+        }
 
-		return new InternalReadReport(counter,startTimestamp,stopTimestamp);
-	}
+        return new InternalReadReport(counter, startTimestamp, stopTimestamp);
+    }
 
     @Override
-    public void aggregate(RpcController controller, AggregateProtos.AggregateRequest request, RpcCallback<AggregateProtos.AggregateResult> done) {
+    public void timeseriesAggregate(RpcController controller, AggregateProtos.TimeSeriesAggregateRequest request, RpcCallback<AggregateProtos.AggregateResult> done) {
         AggregateResult result = null;
         try {
             result = this.aggregate(ProtoBufConverter.fromPBEntityDefinition(request.getEntityDefinition()),
-                    ProtoBufConverter.fromPBScan(request.getScan()),
-                    ProtoBufConverter.fromPBStringList(request.getGroupbyFieldsList()),
-                    ProtoBufConverter.fromPBByteArrayList(request.getAggregateFuncTypesList()),
-                    ProtoBufConverter.fromPBStringList(request.getAggregatedFieldsList())
+                ProtoBufConverter.fromPBScan(request.getScan()),
+                ProtoBufConverter.fromPBStringList(request.getGroupbyFieldsList()),
+                ProtoBufConverter.fromPBByteArrayList(request.getAggregateFuncTypesList()),
+                ProtoBufConverter.fromPBStringList(request.getAggregatedFieldsList()),
+                request.getStartTime(),
+                request.getEndTime(),
+                request.getIntervalMin()
             );
         } catch (IOException e) {
+            LOG.error("Failed to convert result to PB-based message", e);
             ResponseConverter.setControllerException(controller, e);
         }
         try {
             done.run(ProtoBufConverter.toPBAggregateResult(result));
         } catch (IOException e) {
-            throw new RuntimeException("Failed to convert result to PB-based message",e);
+            LOG.error("Failed to convert result to PB-based message", e);
+            ResponseConverter.setControllerException(controller, e);
         }
     }
 
     @Override
-    public void timeseriesAggregate(RpcController controller, AggregateProtos.TimeSeriesAggregateRequest request, RpcCallback<AggregateProtos.AggregateResult> done) {
+    public void aggregate(RpcController controller, AggregateProtos.AggregateRequest request, RpcCallback<AggregateProtos.AggregateResult> done) {
         AggregateResult result = null;
         try {
             result = this.aggregate(ProtoBufConverter.fromPBEntityDefinition(request.getEntityDefinition()),
-                    ProtoBufConverter.fromPBScan(request.getScan()),
-                    ProtoBufConverter.fromPBStringList(request.getGroupbyFieldsList()),
-                    ProtoBufConverter.fromPBByteArrayList(request.getAggregateFuncTypesList()),
-                    ProtoBufConverter.fromPBStringList(request.getAggregatedFieldsList()),
-                    request.getStartTime(),
-                    request.getEndTime(),
-                    request.getIntervalMin()
+                ProtoBufConverter.fromPBScan(request.getScan()),
+                ProtoBufConverter.fromPBStringList(request.getGroupbyFieldsList()),
+                ProtoBufConverter.fromPBByteArrayList(request.getAggregateFuncTypesList()),
+                ProtoBufConverter.fromPBStringList(request.getAggregatedFieldsList())
             );
         } catch (IOException e) {
-            LOG.error("Failed to convert result to PB-based message",e);
             ResponseConverter.setControllerException(controller, e);
         }
         try {
             done.run(ProtoBufConverter.toPBAggregateResult(result));
         } catch (IOException e) {
-            LOG.error("Failed to convert result to PB-based message",e);
-            ResponseConverter.setControllerException(controller, e);
+            throw new RuntimeException("Failed to convert result to PB-based message", e);
         }
     }
 
-    private final static Logger LOG = LoggerFactory.getLogger(AggregateProtocolEndPoint.class);
-    /**
-     *
-     * @param entityDefinition
-     * @param scan
-     * @param groupbyFields
-     * @param aggregateFuncTypes
-     * @param aggregatedFields
-     * @return
-     * @throws Exception
-     */
     @Override
-    public AggregateResult aggregate(EntityDefinition entityDefinition, Scan scan, List<String> groupbyFields, List<byte[]> aggregateFuncTypes, List<String> aggregatedFields) throws IOException {
-//		LOG.info("Using coprocessor instance: "+this);
+    public AggregateResult aggregate(EntityDefinition entityDefinition, Scan scan, List<String> groupbyFields,
+                                     List<byte[]> aggregateFuncTypes, List<String> aggregatedFields) throws IOException {
         checkNotNull(entityDefinition, "entityDefinition");
         String serviceName = entityDefinition.getService();
-        LOG.info(this.getLogHeader() +" raw group aggregate on service: " + serviceName + " by: " + groupbyFields + " func: " + AggregateFunctionType.fromBytesList(aggregateFuncTypes) + " fields: " + aggregatedFields);
-        if(LOG.isDebugEnabled()) LOG.debug("SCAN: "+scan.toJSON());
-        long _start = System.currentTimeMillis();
-        final RawAggregator aggregator = new RawAggregator(groupbyFields,AggregateFunctionType.fromBytesList(aggregateFuncTypes),aggregatedFields,entityDefinition);
+        LOG.info(this.getLogHeader() + " raw group aggregate on service: " + serviceName
+            + " by: " + groupbyFields + " func: " + AggregateFunctionType.fromBytesList(aggregateFuncTypes) + " fields: " + aggregatedFields);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("SCAN: " + scan.toJSON());
+        }
+        final long startTimestamp = System.currentTimeMillis();
+        final RawAggregator aggregator = new RawAggregator(groupbyFields,
+            AggregateFunctionType.fromBytesList(aggregateFuncTypes), aggregatedFields, entityDefinition);
         InternalReadReport report = this.asyncStreamRead(entityDefinition, scan, aggregator);
 
         List<GroupbyKeyValue> keyValues = aggregator.getGroupbyKeyValues();
@@ -384,34 +382,31 @@ public class AggregateProtocolEndPoint extends AggregateProtos.AggregateProtocol
         result.setStopTimestamp(report.getStopTimestamp());
 
         long _stop = System.currentTimeMillis();
-        LOG.info(String.format("%s: scan = %d rows, group = %d keys, startTime = %d, endTime = %d, spend = %d ms", this.getLogHeader(),report.getCounter(),keyValues.size(),report.getStartTimestamp(),report.getStopTimestamp(),(_stop - _start)));
+        LOG.info(String.format("%s: scan = %d rows, group = %d keys, startTime = %d, endTime = %d, spend = %d ms",
+            this.getLogHeader(), report.getCounter(), keyValues.size(), report.getStartTimestamp(),
+            report.getStopTimestamp(), (_stop - startTimestamp)));
 
         return result;
     }
 
-    /**
-     * TODO: refactor time series aggregator to remove dependency of business logic entity class
-     *
-     * @param entityDefinition
-     * @param scan
-     * @param groupbyFields
-     * @param aggregateFuncTypes
-     * @param aggregatedFields
-     * @param intervalMin
-     * @return
-     * @throws Exception
-     */
     @Override
-    public AggregateResult aggregate(EntityDefinition entityDefinition, Scan scan, List<String> groupbyFields, List<byte[]> aggregateFuncTypes, List<String> aggregatedFields, long startTime,long endTime,long intervalMin) throws IOException {
-//		LOG.info("Using coprocessor instance: "+this);
+    public AggregateResult aggregate(EntityDefinition entityDefinition, Scan scan, List<String> groupbyFields,
+                                     List<byte[]> aggregateFuncTypes, List<String> aggregatedFields, long startTime, long endTime, long intervalMin) throws IOException {
         checkNotNull(entityDefinition, "entityDefinition");
         String serviceName = entityDefinition.getService();
-        LOG.info(this.getLogHeader() + " time series group aggregate on service: " + serviceName + " by: " + groupbyFields + " func: " + AggregateFunctionType.fromBytesList(aggregateFuncTypes) + " fields: " + aggregatedFields + " intervalMin: " + intervalMin +
-                " from: " + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(startTime) + " to: " + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(endTime));
-        if(LOG.isDebugEnabled()) LOG.debug("SCAN: "+scan.toJSON());
-        long _start = System.currentTimeMillis();
-        final TimeSeriesAggregator aggregator = new TimeSeriesAggregator(groupbyFields,AggregateFunctionType.fromBytesList(aggregateFuncTypes),aggregatedFields,startTime,endTime,intervalMin);
-        InternalReadReport report = this.asyncStreamRead(entityDefinition, scan,aggregator);
+        LOG.info(this.getLogHeader() + " time series group aggregate on service: " + serviceName
+            + " by: " + groupbyFields + " func: " + AggregateFunctionType.fromBytesList(aggregateFuncTypes)
+            + " fields: " + aggregatedFields + " intervalMin: " + intervalMin
+            + " from: " + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(startTime)
+            + " to: " + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(endTime));
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("SCAN: " + scan.toJSON());
+        }
+
+        final long startTimestamp = System.currentTimeMillis();
+        final TimeSeriesAggregator aggregator = new TimeSeriesAggregator(groupbyFields,
+            AggregateFunctionType.fromBytesList(aggregateFuncTypes), aggregatedFields, startTime, endTime, intervalMin);
+        InternalReadReport report = this.asyncStreamRead(entityDefinition, scan, aggregator);
         List<GroupbyKeyValue> keyValues = aggregator.getGroupbyKeyValues();
 
         AggregateResult result = new AggregateResult();
@@ -420,28 +415,9 @@ public class AggregateProtocolEndPoint extends AggregateProtos.AggregateProtocol
         result.setStopTimestamp(report.getStopTimestamp());
 
         long _stop = System.currentTimeMillis();
-        LOG.info(String.format("%s: scan = %d rows, group = %d keys, startTime = %d, endTime = %d, spend = %d ms", this.getLogHeader(),report.getCounter(),keyValues.size(),report.getStartTimestamp(),report.getStopTimestamp(),(_stop - _start)));
+        LOG.info(String.format("%s: scan = %d rows, group = %d keys, startTime = %d, endTime = %d, spend = %d ms",
+            this.getLogHeader(), report.getCounter(), keyValues.size(), report.getStartTimestamp(), report.getStopTimestamp(), (_stop - startTimestamp)));
 
         return result;
     }
-
-//	/**
-//	 * Initialization per aggregate RPC call
-//	 */
-//	private void _init(){
-//		this.startTimestamp = 0;
-//		this.stopTimestamp = 0;
-//	}
-//
-//	// Min
-//	private long startTimestamp;
-//	// Max
-//	private long stopTimestamp;
-//
-//	public long getStartTimestamp() {
-//		return startTimestamp;
-//	}
-//	public long getStopTimestamp() {
-//		return stopTimestamp;
-//	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResult.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResult.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResult.java
index 84380eb..a49ad57 100755
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResult.java
+++ b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResult.java
@@ -31,8 +31,7 @@ import java.io.Serializable;
 import java.util.List;
 
 /**
- * Aggregated writable result consist of group-by key-values list and additional meta information
- * 
+ * Aggregated writable result consist of group-by key-values list and additional meta information.
  * <h2>Schema</h2>
  * <pre>
  * {
@@ -42,88 +41,88 @@ import java.util.List;
  * }
  * </pre>
  */
-public class AggregateResult implements Writable,Serializable{
-
-	private final static Logger LOG = LoggerFactory.getLogger(AggregateResult.class);
-
-	/**
-	 * Automatically generated default serialVersionUID
-	 */
-	private static final long serialVersionUID = 1L;
-	
-	private final WritableList<GroupbyKeyValue> keyValues;
-	
-	private long startTimestamp = 0;
-	
-	public long getStartTimestamp() {
-		return startTimestamp;
-	}
-
-	public void setStartTimestamp(long startTimestamp) {
-		this.startTimestamp = startTimestamp;
-	}
-
-	public long getStopTimestamp() {
-		return stopTimestamp;
-	}
-
-	public void setStopTimestamp(long stopTimestamp) {
-		this.stopTimestamp = stopTimestamp;
-	}
-
-	public WritableList<GroupbyKeyValue> getKeyValues() {
-		return keyValues;
-	}
-	
-	public void setKeyValues(List<GroupbyKeyValue> keyValues){
-		this.keyValues.addAll(keyValues);
-	}
-	
-	private long stopTimestamp;
-	
-	public AggregateResult(){
-		this.keyValues = new WritableList<GroupbyKeyValue>(GroupbyKeyValue.class);
-	}
-	
-	@Override
-	public void readFields(DataInput in) throws IOException {
-		this.startTimestamp = in.readLong();
-		this.stopTimestamp = in.readLong();
-		keyValues.readFields(in);
-	}
-	
-	@Override
-	public void write(DataOutput out) throws IOException {
-		out.writeLong(this.startTimestamp);
-		out.writeLong(this.stopTimestamp);
-		keyValues.write(out);
-	}
-
-
-	public static AggregateResult build(List<String[]> keys,List<double[]> values,List<Integer> counts,long startTimestamp,long stopTimestamp){
-		if(keys.size() > values.size()){
-			throw new IllegalArgumentException("keys' size: "+keys.size()+" not equal with values' size: "+values.size());
-		}
-		AggregateResult result = new AggregateResult();
-		result.setStartTimestamp(startTimestamp);
-		result.setStopTimestamp(stopTimestamp);
-		WritableList<GroupbyKeyValue> keyValues = new WritableList<GroupbyKeyValue>(GroupbyKeyValue.class,keys.size());
-
-		for(int i=0;i<keys.size();i++) {
-			String[] key  = keys.get(i);
-			GroupbyKey gkey = new GroupbyKey();
-			for (String k : key) {
-				gkey.addValue(k.getBytes());
-			}
-			GroupbyValue gvalue = new GroupbyValue();
-			double[] value = values.get(i);
-			for(double v:value){
-				gvalue.add(v);
-				gvalue.addMeta(counts.get(i));
-			}
-			keyValues.add(new GroupbyKeyValue(gkey, gvalue));
-		}
-		result.setKeyValues(keyValues);
-		return result;
-	}
+public class AggregateResult implements Writable, Serializable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AggregateResult.class);
+
+    /**
+     * Automatically generated default serialVersionUID.
+     */
+    private static final long serialVersionUID = 1L;
+
+    private final WritableList<GroupbyKeyValue> keyValues;
+
+    private long startTimestamp = 0;
+
+    public long getStartTimestamp() {
+        return startTimestamp;
+    }
+
+    public void setStartTimestamp(long startTimestamp) {
+        this.startTimestamp = startTimestamp;
+    }
+
+    public long getStopTimestamp() {
+        return stopTimestamp;
+    }
+
+    public void setStopTimestamp(long stopTimestamp) {
+        this.stopTimestamp = stopTimestamp;
+    }
+
+    public WritableList<GroupbyKeyValue> getKeyValues() {
+        return keyValues;
+    }
+
+    public void setKeyValues(List<GroupbyKeyValue> keyValues) {
+        this.keyValues.addAll(keyValues);
+    }
+
+    private long stopTimestamp;
+
+    public AggregateResult() {
+        this.keyValues = new WritableList<GroupbyKeyValue>(GroupbyKeyValue.class);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        this.startTimestamp = in.readLong();
+        this.stopTimestamp = in.readLong();
+        keyValues.readFields(in);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        out.writeLong(this.startTimestamp);
+        out.writeLong(this.stopTimestamp);
+        keyValues.write(out);
+    }
+
+
+    public static AggregateResult build(List<String[]> keys, List<double[]> values, List<Integer> counts, long startTimestamp, long stopTimestamp) {
+        if (keys.size() > values.size()) {
+            throw new IllegalArgumentException("keys' size: " + keys.size() + " not equal with values' size: " + values.size());
+        }
+        AggregateResult result = new AggregateResult();
+        result.setStartTimestamp(startTimestamp);
+        result.setStopTimestamp(stopTimestamp);
+        WritableList<GroupbyKeyValue> keyValues = new WritableList<GroupbyKeyValue>(GroupbyKeyValue.class, keys.size());
+
+        for (int i = 0; i < keys.size(); i++) {
+            String[] key = keys.get(i);
+            GroupbyKey gkey = new GroupbyKey();
+            for (String k : key) {
+                gkey.addValue(k.getBytes());
+            }
+            GroupbyValue gvalue = new GroupbyValue();
+            double[] value = values.get(i);
+            for (double v : value) {
+                gvalue.add(v);
+                gvalue.addMeta(counts.get(i));
+            }
+            keyValues.add(new GroupbyKeyValue(gkey, gvalue));
+        }
+        result.setKeyValues(keyValues);
+        return result;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResultCallback.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResultCallback.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResultCallback.java
index a68a592..306a6d1 100755
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResultCallback.java
+++ b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/AggregateResultCallback.java
@@ -21,28 +21,20 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch;
 
 /**
  * <h1>AggregateResultCallback Interface</h1>
- *
  * Merge coprocessor results from different regions and generate final aggregate result
  * <br/>
- *
- * @see org.apache.hadoop.hbase.client.HTableInterface
- * 		coprocessorExec(Class<T> protocol, byte[] startKey, byte[] endKey, Batch.Call<T,R> callable) throws IOException, Throwable;
- *
+ * @see org.apache.hadoop.hbase.client.HTableInterface#coprocessorService(Class, byte[], byte[], Batch.Call, Batch.Callback)
  */
-public interface AggregateResultCallback extends Batch.Callback<AggregateProtos.AggregateResult>{
-	/**
-	 * Generate final result after callback from region servers
-	 *
-	 * @return AggregateResult
-	 */
+public interface AggregateResultCallback extends Batch.Callback<AggregateProtos.AggregateResult> {
+    /**
+     * Generate final result after callback from region servers.
+     *
+     * @return AggregateResult
+     */
     AggregateResult result();
 
     /**
-     * Compatible for older callback interface in 0.94 or older
-     *
-     * @param region
-     * @param row
-     * @param result
+     * Compatible for older callback interface in 0.94 or older.
      */
     void update(byte[] region, byte[] row, AggregateResult result);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/ProtoBufConverter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/ProtoBufConverter.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/ProtoBufConverter.java
index 060d3ba..c3c57ed 100644
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/ProtoBufConverter.java
+++ b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/ProtoBufConverter.java
@@ -32,44 +32,63 @@ import java.util.ArrayList;
 import java.util.List;
 
 /**
- * The protocol adapter for migrating from hbase-0.94 to hbase-0.96+
- *
- * @since 6/3/15
+ * The protocol adapter for <code>hbase-0.98</code> and <code>protobuffer-2.5</code>
  */
 public final class ProtoBufConverter {
     public static AggregateResult fromPBResult(AggregateProtos.AggregateResult pbResult) throws IOException {
-        ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(pbResult.getByteArray().toByteArray());;
+        ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(pbResult.getByteArray().toByteArray());
+        ;
         AggregateResult result = new AggregateResult();
         result.readFields(byteArrayDataInput);
         return result;
     }
 
-    public static AggregateProtos.AggregateRequest toPBRequest(EntityDefinition entityDefinition, Scan scan, List<String> groupbyFields, List<byte[]> aggregateFuncTypesBytes, List<String> aggregatedFields) throws IOException {
+    public static AggregateProtos.AggregateRequest toPBRequest(
+            EntityDefinition entityDefinition, Scan scan, List<String> groupbyFields,
+            List<byte[]> aggregateFuncTypesBytes, List<String> aggregatedFields) throws IOException {
         AggregateProtos.AggregateRequest.Builder builder = AggregateProtos.AggregateRequest.newBuilder()
                 .setEntityDefinition(AggregateProtos.EntityDefinition.newBuilder().setByteArray(writableToByteString(entityDefinition)))
                 .setScan(toPBScan(scan));
 
-        for(String groupbyField:groupbyFields) builder.addGroupbyFields(groupbyField);
-        for(byte[] funcTypeBytes:aggregateFuncTypesBytes) builder.addAggregateFuncTypes(ByteString.copyFrom(funcTypeBytes));
-        for(String aggField:aggregatedFields) builder.addAggregatedFields(aggField);
+        for (String groupbyField : groupbyFields) {
+            builder.addGroupbyFields(groupbyField);
+        }
+
+        for (byte[] funcTypeBytes : aggregateFuncTypesBytes) {
+            builder.addAggregateFuncTypes(ByteString.copyFrom(funcTypeBytes));
+        }
+
+        for (String aggField : aggregatedFields) {
+            builder.addAggregatedFields(aggField);
+        }
 
         return builder.build();
     }
 
     public static ByteString writableToByteString(Writable writable) throws IOException {
-        ByteArrayDataOutput dataOutput = ByteStreams.newDataOutput();;
+        ByteArrayDataOutput dataOutput = ByteStreams.newDataOutput();
         writable.write(dataOutput);
         return ByteString.copyFrom(dataOutput.toByteArray());
     }
 
-    public static AggregateProtos.TimeSeriesAggregateRequest toPBTimeSeriesRequest(EntityDefinition entityDefinition, Scan scan, List<String> groupbyFields, List<byte[]> aggregateFuncTypesBytes, List<String> aggregatedFields, long startTime, long endTime, long intervalMin) throws IOException {
-        AggregateProtos.TimeSeriesAggregateRequest.Builder builder = AggregateProtos.TimeSeriesAggregateRequest.newBuilder()
+    public static AggregateProtos.TimeSeriesAggregateRequest toPBTimeSeriesRequest(
+            EntityDefinition entityDefinition, Scan scan, List<String> groupbyFields,
+            List<byte[]> aggregateFuncTypesBytes, List<String> aggregatedFields,
+            long startTime, long endTime, long intervalMin) throws IOException {
+        AggregateProtos.TimeSeriesAggregateRequest.Builder builder =
+                AggregateProtos.TimeSeriesAggregateRequest.newBuilder()
                 .setEntityDefinition(AggregateProtos.EntityDefinition.newBuilder().setByteArray(writableToByteString(entityDefinition)))
                 .setScan(toPBScan(scan));
 
-        for(String groupbyField:groupbyFields) builder.addGroupbyFields(groupbyField);
-        for(byte[] funcTypeBytes:aggregateFuncTypesBytes) builder.addAggregateFuncTypes(ByteString.copyFrom(funcTypeBytes));
-        for(String aggField:aggregatedFields) builder.addAggregatedFields(aggField);
+        for (String groupbyField : groupbyFields) {
+            builder.addGroupbyFields(groupbyField);
+        }
+        for (byte[] funcTypeBytes : aggregateFuncTypesBytes) {
+            builder.addAggregateFuncTypes(ByteString.copyFrom(funcTypeBytes));
+        }
+        for (String aggField : aggregatedFields) {
+            builder.addAggregatedFields(aggField);
+        }
 
         builder.setStartTime(startTime);
         builder.setEndTime(endTime);
@@ -79,33 +98,36 @@ public final class ProtoBufConverter {
     }
 
     public static EntityDefinition fromPBEntityDefinition(AggregateProtos.EntityDefinition entityDefinition) throws IOException {
-        ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(entityDefinition.getByteArray().toByteArray());;
+        ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(entityDefinition.getByteArray().toByteArray());
+        ;
         EntityDefinition result = new EntityDefinition();
         result.readFields(byteArrayDataInput);
         return result;
     }
 
-    public static List<String> fromPBStringList(com.google.protobuf.ProtocolStringList groupbyFieldsList) {
-        List<String> result = new ArrayList<>(groupbyFieldsList.size());
-        for(ByteString byteString:groupbyFieldsList.asByteStringList()){
-            result.add(byteString.toStringUtf8());
-        }
-        return result;
+    //    /**
+    //     * For protobuffer-2.6
+    //     */
+    //    public static List<String> fromPBStringList(com.google.protobuf.ProtocolStringList groupbyFieldsList) {
+    //        List<String> result = new ArrayList<>(groupbyFieldsList.size());
+    //        for(ByteString byteString:groupbyFieldsList.asByteStringList()){
+    //            result.add(byteString.toStringUtf8());
+    //        }
+    //        return result;
+    //    }
+
+    public static List<String> fromPBStringList(List<String> groupbyFieldsList) {
+        return groupbyFieldsList;
     }
 
     public static List<byte[]> fromPBByteArrayList(List<ByteString> aggregateFuncTypesList) {
         List<byte[]> bytesArrayList = new ArrayList<>(aggregateFuncTypesList.size());
-        for(ByteString byteString:aggregateFuncTypesList){
+        for (ByteString byteString : aggregateFuncTypesList) {
             bytesArrayList.add(byteString.toByteArray());
         }
         return bytesArrayList;
     }
 
-    /**
-     *
-     * @param scan
-     * @return
-     */
     public static Scan fromPBScan(ClientProtos.Scan scan) throws IOException {
         return ProtobufUtil.toScan(scan);
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/21187b55/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/_AggregateProtocolEndPoint.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/_AggregateProtocolEndPoint.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/_AggregateProtocolEndPoint.java
deleted file mode 100755
index af213e9..0000000
--- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/coprocessor/_AggregateProtocolEndPoint.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-//package eagle.storage.hbase.query.coprocessor;
-//
-//import eagle.log.entity.meta.EntityDefinition;
-//import eagle.query.aggregate.AggregateFunctionType;
-//import eagle.query.aggregate.raw.GroupbyKeyValue;
-//import eagle.query.aggregate.raw.RawAggregator;
-//import eagle.query.aggregate.timeseries.TimeSeriesAggregator;
-//import eagle.storage.hbase.query.coprocessor.generated.AggregateProtos;
-//import eagle.storage.hbase.query.coprocessor.impl.AbstractAggregateEndPoint;
-//import hadoop.eagle.common.DateTimeUtil;
-//import com.google.protobuf.RpcCallback;
-//import com.google.protobuf.RpcController;
-//import org.apache.hadoop.hbase.client.Scan;
-//import org.apache.hadoop.hbase.protobuf.ResponseConverter;
-//import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-//import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-//
-//import java.io.IOException;
-//import java.util.List;
-//
-///**
-// * Coprocessor EndPoint of protocol <code>AggregateProtocol</code>
-// *
-// * <br/>
-// * <h2>Deployment:</h2>
-// *
-// * Firstly deploy jar files to cluster on local file system or HDFS.<br/><br/>
-// * Secondly configure in <code>hbase-site.xml</code> as following:
-// * <pre>
-// * &lt;property&gt;
-// *   &lt;name>hbase.coprocessor.region.classes&lt;/name&gt;
-// *   &lt;value>AggregateProtocolEndPoint&lt;/value&gt;
-// * &lt;/property&gt;
-// * </pre>
-// * Or register on related hbase tables
-// * <pre>
-// * hbase(main):005:0>  alter 't1', METHOD => 'table_att', 'coprocessor'=>'hdfs:///foo.jar|AggregateProtocolEndPoint|1001|'
-// * </pre>
-// *
-// * <h2>Reference:</h2>
-// * <a href="https://blogs.apache.org/hbase/entry/coprocessor_introduction">
-// * 	Coprocessor Introduction
-// * 	(Authors: Trend Micro Hadoop Group: Mingjie Lai, Eugene Koontz, Andrew Purtell)
-// * </a> <br/><br/>
-// *
-// * @see AggregateProtocol
-// *
-//// * @since : 10/31/14,2014
-// */
-//@SuppressWarnings("unused")
-//public class AggregateProtocolEndPoint extends AbstractAggregateEndPoint {
-//	private final static Logger LOG = LoggerFactory.getLogger(AggregateProtocolEndPoint.class);
-//	/**
-//	 *
-//	 * @param entityDefinition
-//	 * @param scan
-//	 * @param groupbyFields
-//	 * @param aggregateFuncTypes
-//	 * @param aggregatedFields
-//	 * @return
-//	 * @throws Exception
-//	 */
-//	@Override
-//	public AggregateResult aggregate(EntityDefinition entityDefinition, Scan scan, List<String> groupbyFields, List<byte[]> aggregateFuncTypes, List<String> aggregatedFields) throws IOException {
-////		LOG.info("Using coprocessor instance: "+this);
-//		checkNotNull(entityDefinition, "entityDefinition");
-//		String serviceName = entityDefinition.getService();
-//		LOG.info(this.getLogHeader() +" raw group aggregate on service: " + serviceName + " by: " + groupbyFields + " func: " + AggregateFunctionType.fromBytesList(aggregateFuncTypes) + " fields: " + aggregatedFields);
-//		if(LOG.isDebugEnabled()) LOG.debug("SCAN: "+scan.toJSON());
-//		long _start = System.currentTimeMillis();
-//		final RawAggregator aggregator = new RawAggregator(groupbyFields,AggregateFunctionType.fromBytesList(aggregateFuncTypes),aggregatedFields,entityDefinition);
-//		InternalReadReport report = this.asyncStreamRead(entityDefinition, scan, aggregator);
-//
-//		List<GroupbyKeyValue> keyValues = aggregator.getGroupbyKeyValues();
-//		AggregateResult result = new AggregateResult();
-//		result.setKeyValues(keyValues);
-//		result.setStartTimestamp(report.getStartTimestamp());
-//		result.setStopTimestamp(report.getStopTimestamp());
-//
-//		long _stop = System.currentTimeMillis();
-//		LOG.info(String.format("%s: scan = %d rows, group = %d keys, startTime = %d, endTime = %d, spend = %d ms", this.getLogHeader(),report.getCounter(),keyValues.size(),report.getStartTimestamp(),report.getStopTimestamp(),(_stop - _start)));
-//
-//		return result;
-//	}
-//
-//	/**
-//	 * TODO: refactor time series aggregator to remove dependency of business logic entity class
-//	 *
-//	 * @param entityDefinition
-//	 * @param scan
-//	 * @param groupbyFields
-//	 * @param aggregateFuncTypes
-//	 * @param aggregatedFields
-//	 * @param intervalMin
-//	 * @return
-//	 * @throws Exception
-//	 */
-//	@Override
-//	public AggregateResult aggregate(EntityDefinition entityDefinition, Scan scan, List<String> groupbyFields, List<byte[]> aggregateFuncTypes, List<String> aggregatedFields, long startTime,long endTime,long intervalMin) throws IOException {
-////		LOG.info("Using coprocessor instance: "+this);
-//		checkNotNull(entityDefinition, "entityDefinition");
-//		String serviceName = entityDefinition.getService();
-//		LOG.info(this.getLogHeader() + " time series group aggregate on service: " + serviceName + " by: " + groupbyFields + " func: " + AggregateFunctionType.fromBytesList(aggregateFuncTypes) + " fields: " + aggregatedFields + " intervalMin: " + intervalMin +
-//				" from: " + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(startTime) + " to: " + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(endTime));
-//		if(LOG.isDebugEnabled()) LOG.debug("SCAN: "+scan.toJSON());
-//		long _start = System.currentTimeMillis();
-//		final TimeSeriesAggregator aggregator = new TimeSeriesAggregator(groupbyFields,AggregateFunctionType.fromBytesList(aggregateFuncTypes),aggregatedFields,startTime,endTime,intervalMin);
-//		InternalReadReport report = this.asyncStreamRead(entityDefinition, scan,aggregator);
-//		List<GroupbyKeyValue> keyValues = aggregator.getGroupbyKeyValues();
-//
-//		AggregateResult result = new AggregateResult();
-//		result.setKeyValues(keyValues);
-//		result.setStartTimestamp(report.getStartTimestamp());
-//		result.setStopTimestamp(report.getStopTimestamp());
-//
-//		long _stop = System.currentTimeMillis();
-//		LOG.info(String.format("%s: scan = %d rows, group = %d keys, startTime = %d, endTime = %d, spend = %d ms", this.getLogHeader(),report.getCounter(),keyValues.size(),report.getStartTimestamp(),report.getStopTimestamp(),(_stop - _start)));
-//
-//		return result;
-//	}
-//}
\ No newline at end of file