You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ja...@apache.org on 2018/02/07 07:07:16 UTC
[09/10] eagle git commit: [EAGLE-1081] Checkstyle fixes for
eagle-entity-base module
http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReader.java
index c3d916e..6dfe27d 100755
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReader.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReader.java
@@ -29,97 +29,103 @@ import java.util.ArrayList;
import java.util.List;
public class GenericEntityStreamReader extends StreamReader {
- private static final Logger LOG = LoggerFactory.getLogger(GenericEntityStreamReader.class);
-
- private EntityDefinition entityDef;
- private SearchCondition condition;
- private String prefix;
- private StreamReader readerAfterPlan;
-
- public GenericEntityStreamReader(String serviceName, SearchCondition condition) throws InstantiationException, IllegalAccessException{
- this(serviceName, condition, null);
- }
-
- public GenericEntityStreamReader(EntityDefinition entityDef, SearchCondition condition) throws InstantiationException, IllegalAccessException{
- this(entityDef, condition, entityDef.getPrefix());
- }
-
- public GenericEntityStreamReader(String serviceName, SearchCondition condition, String prefix) throws InstantiationException, IllegalAccessException{
- this.prefix = prefix;
- checkNotNull(serviceName, "serviceName");
- this.entityDef = EntityDefinitionManager.getEntityByServiceName(serviceName);
- checkNotNull(entityDef, "EntityDefinition");
- this.condition = condition;
- this.readerAfterPlan = selectQueryReader();
- }
-
- public GenericEntityStreamReader(EntityDefinition entityDef, SearchCondition condition, String prefix) throws InstantiationException, IllegalAccessException{
- this.prefix = prefix;
- checkNotNull(entityDef, "entityDef");
- this.entityDef = entityDef;
- checkNotNull(entityDef, "EntityDefinition");
- this.condition = condition;
- this.readerAfterPlan = selectQueryReader();
- }
-
- private void checkNotNull(Object o, String message){
- if(o == null){
- throw new IllegalArgumentException(message + " should not be null");
- }
- }
-
- public EntityDefinition getEntityDefinition() {
- return entityDef;
- }
-
- public SearchCondition getSearchCondition() {
- return condition;
- }
-
- @Override
- public void readAsStream() throws Exception{
- readerAfterPlan._listeners.addAll(this._listeners);
- readerAfterPlan.readAsStream();
- }
-
- private StreamReader selectQueryReader() throws InstantiationException, IllegalAccessException {
- final ORExpression query = condition.getQueryExpression();
- IndexDefinition[] indexDefs = entityDef.getIndexes();
+ private static final Logger LOG = LoggerFactory.getLogger(GenericEntityStreamReader.class);
+
+ private EntityDefinition entityDef;
+ private SearchCondition condition;
+ private String prefix;
+ private StreamReader readerAfterPlan;
+
+ public GenericEntityStreamReader(String serviceName, SearchCondition condition)
+ throws InstantiationException, IllegalAccessException {
+ this(serviceName, condition, null);
+ }
+
+ public GenericEntityStreamReader(EntityDefinition entityDef, SearchCondition condition)
+ throws InstantiationException, IllegalAccessException {
+ this(entityDef, condition, entityDef.getPrefix());
+ }
+
+ public GenericEntityStreamReader(String serviceName, SearchCondition condition, String prefix)
+ throws InstantiationException, IllegalAccessException {
+ this.prefix = prefix;
+ checkNotNull(serviceName, "serviceName");
+ this.entityDef = EntityDefinitionManager.getEntityByServiceName(serviceName);
+ checkNotNull(entityDef, "EntityDefinition");
+ this.condition = condition;
+ this.readerAfterPlan = selectQueryReader();
+ }
+
+ public GenericEntityStreamReader(EntityDefinition entityDef, SearchCondition condition, String prefix)
+ throws InstantiationException, IllegalAccessException {
+ this.prefix = prefix;
+ checkNotNull(entityDef, "entityDef");
+ this.entityDef = entityDef;
+ checkNotNull(entityDef, "EntityDefinition");
+ this.condition = condition;
+ this.readerAfterPlan = selectQueryReader();
+ }
+
+ private void checkNotNull(Object o, String message) {
+ if (o == null) {
+ throw new IllegalArgumentException(message + " should not be null");
+ }
+ }
+
+ public EntityDefinition getEntityDefinition() {
+ return entityDef;
+ }
+
+ public SearchCondition getSearchCondition() {
+ return condition;
+ }
+
+ @Override
+ public void readAsStream() throws Exception {
+ readerAfterPlan.listeners.addAll(this.listeners);
+ readerAfterPlan.readAsStream();
+ }
+
+ private StreamReader selectQueryReader() throws InstantiationException, IllegalAccessException {
+ final ORExpression query = condition.getQueryExpression();
+ IndexDefinition[] indexDefs = entityDef.getIndexes();
// Index just works with query condition
- if (indexDefs != null && condition.getQueryExpression()!=null) {
- List<byte[]> rowkeys = new ArrayList<>();
- for (IndexDefinition index : indexDefs) {
- // Check unique index first
- if (index.isUnique()) {
- final IndexDefinition.IndexType type = index.canGoThroughIndex(query, rowkeys);
- if (!IndexDefinition.IndexType.NON_INDEX.equals(type)) {
- LOG.info("Selectd query unique index " + index.getIndexName() + " for query: " + condition.getQueryExpression());
- return new UniqueIndexStreamReader(index, condition, rowkeys);
- }
- }
- }
- for (IndexDefinition index : indexDefs) {
- // Check non-clustered index
- if (!index.isUnique()) {
- final IndexDefinition.IndexType type = index.canGoThroughIndex(query, rowkeys);
- if (!IndexDefinition.IndexType.NON_INDEX.equals(type)) {
- LOG.info("Selectd query non clustered index " + index.getIndexName() + " for query: " + condition.getQueryExpression().toString());
- return new NonClusteredIndexStreamReader(index, condition, rowkeys);
- }
- }
- }
- }
- return new GenericEntityScanStreamReader(entityDef, condition, this.prefix);
- }
-
- @Override
- public long getLastTimestamp() {
- return readerAfterPlan.getLastTimestamp();
- }
-
- @Override
- public long getFirstTimestamp() {
- return readerAfterPlan.getFirstTimestamp();
- }
+ if (indexDefs != null && condition.getQueryExpression() != null) {
+ List<byte[]> rowkeys = new ArrayList<>();
+ for (IndexDefinition index : indexDefs) {
+ // Check unique index first
+ if (index.isUnique()) {
+ final IndexDefinition.IndexType type = index.canGoThroughIndex(query, rowkeys);
+ if (!IndexDefinition.IndexType.NON_INDEX.equals(type)) {
+ LOG.info("Selectd query unique index " + index.getIndexName() + " for query: "
+ + condition.getQueryExpression());
+ return new UniqueIndexStreamReader(index, condition, rowkeys);
+ }
+ }
+ }
+ for (IndexDefinition index : indexDefs) {
+ // Check non-clustered index
+ if (!index.isUnique()) {
+ final IndexDefinition.IndexType type = index.canGoThroughIndex(query, rowkeys);
+ if (!IndexDefinition.IndexType.NON_INDEX.equals(type)) {
+ LOG.info("Selectd query non clustered index " + index.getIndexName() + " for query: "
+ + condition.getQueryExpression().toString());
+ return new NonClusteredIndexStreamReader(index, condition, rowkeys);
+ }
+ }
+ }
+ }
+ return new GenericEntityScanStreamReader(entityDef, condition, this.prefix);
+ }
+
+ @Override
+ public long getLastTimestamp() {
+ return readerAfterPlan.getLastTimestamp();
+ }
+
+ @Override
+ public long getFirstTimestamp() {
+ return readerAfterPlan.getFirstTimestamp();
+ }
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReaderMT.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReaderMT.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReaderMT.java
index bf72a36..15bdd20 100755
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReaderMT.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityStreamReaderMT.java
@@ -31,121 +31,124 @@ import org.apache.eagle.common.DateTimeUtil;
/**
* multi-threading stream readers which only applies to time-series entity where we split the query into
- * different time range
- *
- * When this class is used together with list query or aggregate query, be aware that the query's behavior could
- * be changed for example pageSize does not work well, output sequence is not determined
+ * different time range When this class is used together with list query or aggregate query, be aware that the
+ * query's behavior could be changed for example pageSize does not work well, output sequence is not
+ * determined
*/
-public class GenericEntityStreamReaderMT extends StreamReader{
- private static final Logger LOG = LoggerFactory.getLogger(GenericEntityStreamReaderMT.class);
- private List<GenericEntityStreamReader> readers = new ArrayList<GenericEntityStreamReader>();
-
- public GenericEntityStreamReaderMT(String serviceName, SearchCondition condition, int numThreads) throws Exception{
- checkIsTimeSeries(serviceName);
- checkNumThreads(numThreads);
- long queryStartTime = condition.getStartTime();
- long queryEndTime = condition.getEndTime();
- long subStartTime = queryStartTime;
- long subEndTime = 0;
- long interval = (queryEndTime-queryStartTime) / numThreads;
- for(int i=0; i<numThreads; i++){
- // split search condition by time range
- subStartTime = queryStartTime + i*interval;
- if(i == numThreads-1){
- subEndTime = queryEndTime;
- }else{
- subEndTime = subStartTime + interval;
- }
- //String strStartTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(subStartTime);
- //String strEndTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(subEndTime);
- SearchCondition sc = new SearchCondition(condition);
- sc.setStartTime(subStartTime);
- sc.setEndTime(subEndTime);
- GenericEntityStreamReader reader = new GenericEntityStreamReader(serviceName, sc);
- readers.add(reader);
- }
- }
-
- private void checkIsTimeSeries(String serviceName) throws Exception{
- EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName(serviceName);
- if(!ed.isTimeSeries()){
- throw new IllegalArgumentException("Multi-threading stream reader must be applied to time series table");
- }
- }
-
- private void checkNumThreads(int numThreads){
- if(numThreads <= 0){
- throw new IllegalArgumentException("Multi-threading stream reader must have numThreads >= 1");
- }
- }
-
- /**
- * default to 2 threads
- * @param serviceName
- * @param condition
- */
- public GenericEntityStreamReaderMT(String serviceName, SearchCondition condition) throws Exception{
- this(serviceName, condition, 2);
- }
-
- @Override
- public void readAsStream() throws Exception{
- // populate listeners to all readers
- for(EntityCreationListener l : _listeners){
- for(GenericEntityStreamReader r : readers){
- r.register(l);
- }
- }
-
- List<Future<Void>> futures = new ArrayList<Future<Void>>();
- for(GenericEntityStreamReader r : readers){
- SingleReader reader = new SingleReader(r);
- Future<Void> readFuture = EagleConfigFactory.load().getExecutor().submit(reader);
- futures.add(readFuture);
- }
-
- // join threads and check exceptions
- for(Future<Void> future : futures){
- try{
- future.get();
- }catch(Exception ex){
- LOG.error("Error in read", ex);
- throw ex;
- }
- }
- }
-
- private static class SingleReader implements Callable<Void>{
- private GenericEntityStreamReader reader;
- public SingleReader(GenericEntityStreamReader reader){
- this.reader = reader;
- }
- @Override
- public Void call() throws Exception{
- reader.readAsStream();
- return null;
- }
- }
-
- @Override
- public long getLastTimestamp() {
- long lastTimestamp = 0;
- for (GenericEntityStreamReader reader : readers) {
- if (lastTimestamp < reader.getLastTimestamp()) {
- lastTimestamp = reader.getLastTimestamp();
- }
- }
- return lastTimestamp;
- }
-
- @Override
- public long getFirstTimestamp() {
- long firstTimestamp = 0;
- for (GenericEntityStreamReader reader : readers) {
- if (firstTimestamp > reader.getLastTimestamp() || firstTimestamp == 0) {
- firstTimestamp = reader.getLastTimestamp();
- }
- }
- return firstTimestamp;
- }
+public class GenericEntityStreamReaderMT extends StreamReader {
+ private static final Logger LOG = LoggerFactory.getLogger(GenericEntityStreamReaderMT.class);
+ private List<GenericEntityStreamReader> readers = new ArrayList<GenericEntityStreamReader>();
+
+ public GenericEntityStreamReaderMT(String serviceName, SearchCondition condition, int numThreads)
+ throws Exception {
+ checkIsTimeSeries(serviceName);
+ checkNumThreads(numThreads);
+ long queryStartTime = condition.getStartTime();
+ long queryEndTime = condition.getEndTime();
+ long subStartTime = queryStartTime;
+ long subEndTime = 0;
+ long interval = (queryEndTime - queryStartTime) / numThreads;
+ for (int i = 0; i < numThreads; i++) {
+ // split search condition by time range
+ subStartTime = queryStartTime + i * interval;
+ if (i == numThreads - 1) {
+ subEndTime = queryEndTime;
+ } else {
+ subEndTime = subStartTime + interval;
+ }
+ // String strStartTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(subStartTime);
+ // String strEndTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(subEndTime);
+ SearchCondition sc = new SearchCondition(condition);
+ sc.setStartTime(subStartTime);
+ sc.setEndTime(subEndTime);
+ GenericEntityStreamReader reader = new GenericEntityStreamReader(serviceName, sc);
+ readers.add(reader);
+ }
+ }
+
+ private void checkIsTimeSeries(String serviceName) throws Exception {
+ EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName(serviceName);
+ if (!ed.isTimeSeries()) {
+ throw new IllegalArgumentException("Multi-threading stream reader must be applied to time series table");
+ }
+ }
+
+ private void checkNumThreads(int numThreads) {
+ if (numThreads <= 0) {
+ throw new IllegalArgumentException("Multi-threading stream reader must have numThreads >= 1");
+ }
+ }
+
+ /**
+ * default to 2 threads
+ *
+ * @param serviceName
+ * @param condition
+ */
+ public GenericEntityStreamReaderMT(String serviceName, SearchCondition condition) throws Exception {
+ this(serviceName, condition, 2);
+ }
+
+ @Override
+ public void readAsStream() throws Exception {
+ // populate listeners to all readers
+ for (EntityCreationListener l : listeners) {
+ for (GenericEntityStreamReader r : readers) {
+ r.register(l);
+ }
+ }
+
+ List<Future<Void>> futures = new ArrayList<Future<Void>>();
+ for (GenericEntityStreamReader r : readers) {
+ SingleReader reader = new SingleReader(r);
+ Future<Void> readFuture = EagleConfigFactory.load().getExecutor().submit(reader);
+ futures.add(readFuture);
+ }
+
+ // join threads and check exceptions
+ for (Future<Void> future : futures) {
+ try {
+ future.get();
+ } catch (Exception ex) {
+ LOG.error("Error in read", ex);
+ throw ex;
+ }
+ }
+ }
+
+ private static class SingleReader implements Callable<Void> {
+ private GenericEntityStreamReader reader;
+
+ public SingleReader(GenericEntityStreamReader reader) {
+ this.reader = reader;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ reader.readAsStream();
+ return null;
+ }
+ }
+
+ @Override
+ public long getLastTimestamp() {
+ long lastTimestamp = 0;
+ for (GenericEntityStreamReader reader : readers) {
+ if (lastTimestamp < reader.getLastTimestamp()) {
+ lastTimestamp = reader.getLastTimestamp();
+ }
+ }
+ return lastTimestamp;
+ }
+
+ @Override
+ public long getFirstTimestamp() {
+ long firstTimestamp = 0;
+ for (GenericEntityStreamReader reader : readers) {
+ if (firstTimestamp > reader.getLastTimestamp() || firstTimestamp == 0) {
+ firstTimestamp = reader.getLastTimestamp();
+ }
+ }
+ return firstTimestamp;
+ }
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityWriter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityWriter.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityWriter.java
index 5c8b12d..926fcba 100755
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityWriter.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericEntityWriter.java
@@ -27,52 +27,53 @@ import java.util.ArrayList;
import java.util.List;
public class GenericEntityWriter {
- private static final Logger LOG = LoggerFactory.getLogger(GenericEntityWriter.class);
- private EntityDefinition entityDef;
+ private static final Logger LOG = LoggerFactory.getLogger(GenericEntityWriter.class);
+ private EntityDefinition entityDef;
- public GenericEntityWriter(String serviceName) throws InstantiationException, IllegalAccessException{
- this.entityDef = EntityDefinitionManager.getEntityByServiceName(serviceName);
- checkNotNull(entityDef, "serviceName");
- }
+ public GenericEntityWriter(String serviceName) throws InstantiationException, IllegalAccessException {
+ this.entityDef = EntityDefinitionManager.getEntityByServiceName(serviceName);
+ checkNotNull(entityDef, "serviceName");
+ }
- public GenericEntityWriter(EntityDefinition entityDef) throws InstantiationException, IllegalAccessException{
- this.entityDef = entityDef;
- checkNotNull(entityDef, "serviceName");
- }
-
- private void checkNotNull(Object o, String message) {
- if(o == null){
- throw new IllegalArgumentException(message + " should not be null");
- }
- }
+ public GenericEntityWriter(EntityDefinition entityDef)
+ throws InstantiationException, IllegalAccessException {
+ this.entityDef = entityDef;
+ checkNotNull(entityDef, "serviceName");
+ }
- /**
- * @param entities
- * @return row keys
- * @throws Exception
- */
- public List<String> write(List<? extends TaggedLogAPIEntity> entities) throws Exception{
- HBaseLogWriter writer = new HBaseLogWriter(entityDef.getTable(), entityDef.getColumnFamily());
- List<String> rowkeys = new ArrayList<String>(entities.size());
- List<InternalLog> logs = new ArrayList<InternalLog>(entities.size());
-
- try{
- writer.open();
- for(TaggedLogAPIEntity entity : entities){
- final InternalLog entityLog = HBaseInternalLogHelper.convertToInternalLog(entity, entityDef);
- logs.add(entityLog);
- }
- List<byte[]> bRowkeys = writer.write(logs);
- for (byte[] rowkey : bRowkeys) {
- rowkeys.add(EagleBase64Wrapper.encodeByteArray2URLSafeString(rowkey));
- }
+ private void checkNotNull(Object o, String message) {
+ if (o == null) {
+ throw new IllegalArgumentException(message + " should not be null");
+ }
+ }
- }catch(Exception ex){
- LOG.error("fail writing tagged log", ex);
- throw ex;
- }finally{
- writer.close();
- }
- return rowkeys;
- }
+ /**
+ * @param entities
+ * @return row keys
+ * @throws Exception
+ */
+ public List<String> write(List<? extends TaggedLogAPIEntity> entities) throws Exception {
+ HBaseLogWriter writer = new HBaseLogWriter(entityDef.getTable(), entityDef.getColumnFamily());
+ List<String> rowkeys = new ArrayList<String>(entities.size());
+ List<InternalLog> logs = new ArrayList<InternalLog>(entities.size());
+
+ try {
+ writer.open();
+ for (TaggedLogAPIEntity entity : entities) {
+ final InternalLog entityLog = HBaseInternalLogHelper.convertToInternalLog(entity, entityDef);
+ logs.add(entityLog);
+ }
+ List<byte[]> bRowkeys = writer.write(logs);
+ for (byte[] rowkey : bRowkeys) {
+ rowkeys.add(EagleBase64Wrapper.encodeByteArray2URLSafeString(rowkey));
+ }
+
+ } catch (Exception ex) {
+ LOG.error("fail writing tagged log", ex);
+ throw ex;
+ } finally {
+ writer.close();
+ }
+ return rowkeys;
+ }
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntity.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntity.java
index 9f6937b..56cd453 100755
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntity.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntity.java
@@ -21,33 +21,35 @@ import org.apache.eagle.log.entity.meta.*;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
/**
- * GenericMetricEntity should use prefix field which is extended from TaggedLogAPIEntity as metric name
- * metric name is used to partition the metric tables
+ * GenericMetricEntity should use prefix field which is extended from TaggedLogAPIEntity as metric name metric
+ * name is used to partition the metric tables
*/
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@Table("eagle_metric")
@ColumnFamily("f")
@Prefix(GenericMetricEntity.GENERIC_METRIC_PREFIX_PLACE_HOLDER)
@Service(GenericMetricEntity.GENERIC_METRIC_SERVICE)
@TimeSeries(true)
-@Metric(interval=60000)
+@Metric(interval = 60000)
@ServicePath(path = "/metric")
// TODO:
-@Tags({"site","application","policyId","alertExecutorId", "streamName","source","partitionSeq"})
+@Tags({
+ "site", "application", "policyId", "alertExecutorId", "streamName", "source", "partitionSeq"
+ })
public class GenericMetricEntity extends TaggedLogAPIEntity {
- public static final String GENERIC_METRIC_SERVICE = "GenericMetricService";
- public static final String GENERIC_METRIC_PREFIX_PLACE_HOLDER = "GENERIC_METRIC_PREFIX_PLACEHODLER";
- public static final String VALUE_FIELD ="value";
+ public static final String GENERIC_METRIC_SERVICE = "GenericMetricService";
+ public static final String GENERIC_METRIC_PREFIX_PLACE_HOLDER = "GENERIC_METRIC_PREFIX_PLACEHODLER";
+ public static final String VALUE_FIELD = "value";
- @Column("a")
- private double[] value;
+ @Column("a")
+ private double[] value;
- public double[] getValue() {
- return value;
- }
+ public double[] getValue() {
+ return value;
+ }
- public void setValue(double[] value) {
- this.value = value;
- pcs.firePropertyChange("value", null, null);
- }
-}
\ No newline at end of file
+ public void setValue(double[] value) {
+ this.value = value;
+ pcs.firePropertyChange("value", null, null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityBatchReader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityBatchReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityBatchReader.java
index 84b02ae..bc99a81 100755
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityBatchReader.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityBatchReader.java
@@ -23,32 +23,37 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
-public class GenericMetricEntityBatchReader implements EntityCreationListener{
- private static final Logger LOG = LoggerFactory.getLogger(GenericEntityBatchReader.class);
-
- private List<TaggedLogAPIEntity> entities = new ArrayList<TaggedLogAPIEntity>();
- private GenericEntityStreamReader reader;
-
- public GenericMetricEntityBatchReader(String metricName, SearchCondition condition) throws Exception{
- reader = new GenericEntityStreamReader(GenericMetricEntity.GENERIC_METRIC_SERVICE, condition, metricName);
- }
-
- public long getLastTimestamp() {
- return reader.getLastTimestamp();
- }
- public long getFirstTimestamp() {
- return reader.getFirstTimestamp();
- }
- @Override
- public void entityCreated(TaggedLogAPIEntity entity){
- entities.add(entity);
- }
-
- @SuppressWarnings("unchecked")
- public <T> List<T> read() throws Exception{
- if(LOG.isDebugEnabled()) LOG.debug("Start reading as batch mode");
- reader.register(this);
- reader.readAsStream();
- return (List<T>)entities;
- }
+public class GenericMetricEntityBatchReader implements EntityCreationListener {
+ private static final Logger LOG = LoggerFactory.getLogger(GenericEntityBatchReader.class);
+
+ private List<TaggedLogAPIEntity> entities = new ArrayList<TaggedLogAPIEntity>();
+ private GenericEntityStreamReader reader;
+
+ public GenericMetricEntityBatchReader(String metricName, SearchCondition condition) throws Exception {
+ reader = new GenericEntityStreamReader(GenericMetricEntity.GENERIC_METRIC_SERVICE, condition,
+ metricName);
+ }
+
+ public long getLastTimestamp() {
+ return reader.getLastTimestamp();
+ }
+
+ public long getFirstTimestamp() {
+ return reader.getFirstTimestamp();
+ }
+
+ @Override
+ public void entityCreated(TaggedLogAPIEntity entity) {
+ entities.add(entity);
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> List<T> read() throws Exception {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Start reading as batch mode");
+ }
+ reader.register(this);
+ reader.readAsStream();
+ return (List<T>)entities;
+ }
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityDecompactionStreamReader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityDecompactionStreamReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityDecompactionStreamReader.java
index 1cf3905..216022f 100755
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityDecompactionStreamReader.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricEntityDecompactionStreamReader.java
@@ -25,74 +25,79 @@ import org.slf4j.LoggerFactory;
import java.text.ParseException;
-public class GenericMetricEntityDecompactionStreamReader extends StreamReader implements EntityCreationListener{
- @SuppressWarnings("unused")
- private static final Logger LOG = LoggerFactory.getLogger(GenericMetricEntityDecompactionStreamReader.class);
- private GenericEntityStreamReader reader;
- private EntityDefinition ed;
- private String serviceName = GenericMetricEntity.GENERIC_METRIC_SERVICE;
- private long start;
- private long end;
- private GenericMetricShadowEntity single = new GenericMetricShadowEntity();
-
- /**
- * it makes sense that serviceName should not be provided while metric name should be provided as prefix
- * @param metricName
- * @param condition
- * @throws InstantiationException
- * @throws IllegalAccessException
- * @throws ParseException
- */
- public GenericMetricEntityDecompactionStreamReader(String metricName, SearchCondition condition) throws InstantiationException, IllegalAccessException, ParseException{
- ed = EntityDefinitionManager.getEntityByServiceName(serviceName);
- checkIsMetric(ed);
- reader = new GenericEntityStreamReader(serviceName, condition, metricName);
- start = condition.getStartTime();
- end = condition.getEndTime();
- }
-
- private void checkIsMetric(EntityDefinition ed){
- if(ed.getMetricDefinition() == null)
- throw new IllegalArgumentException("Only metric entity comes here");
- }
-
- @Override
- public void entityCreated(TaggedLogAPIEntity entity) throws Exception{
- GenericMetricEntity e = (GenericMetricEntity)entity;
- double[] value = e.getValue();
- if(value != null) {
- int count =value.length;
- @SuppressWarnings("unused")
- Class<?> cls = ed.getMetricDefinition().getSingleTimestampEntityClass();
- for (int i = 0; i < count; i++) {
- long ts = entity.getTimestamp() + i * ed.getMetricDefinition().getInterval();
- // exclude those entity which is not within the time range in search condition. [start, end)
- if (ts < start || ts >= end) {
- continue;
- }
- single.setTimestamp(ts);
- single.setTags(entity.getTags());
- single.setValue(e.getValue()[i]);
- for (EntityCreationListener l : _listeners) {
- l.entityCreated(single);
- }
- }
- }
- }
-
- @Override
- public void readAsStream() throws Exception{
- reader.register(this);
- reader.readAsStream();
- }
+public class GenericMetricEntityDecompactionStreamReader extends StreamReader
+ implements EntityCreationListener {
+ @SuppressWarnings("unused")
+ private static final Logger LOG = LoggerFactory
+ .getLogger(GenericMetricEntityDecompactionStreamReader.class);
+ private GenericEntityStreamReader reader;
+ private EntityDefinition ed;
+ private String serviceName = GenericMetricEntity.GENERIC_METRIC_SERVICE;
+ private long start;
+ private long end;
+ private GenericMetricShadowEntity single = new GenericMetricShadowEntity();
- @Override
- public long getLastTimestamp() {
- return reader.getLastTimestamp();
- }
+ /**
+ * it makes sense that serviceName should not be provided while metric name should be provided as prefix
+ *
+ * @param metricName
+ * @param condition
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws ParseException
+ */
+ public GenericMetricEntityDecompactionStreamReader(String metricName, SearchCondition condition)
+ throws InstantiationException, IllegalAccessException, ParseException {
+ ed = EntityDefinitionManager.getEntityByServiceName(serviceName);
+ checkIsMetric(ed);
+ reader = new GenericEntityStreamReader(serviceName, condition, metricName);
+ start = condition.getStartTime();
+ end = condition.getEndTime();
+ }
- @Override
- public long getFirstTimestamp() {
- return reader.getFirstTimestamp();
- }
-}
\ No newline at end of file
+ private void checkIsMetric(EntityDefinition ed) {
+ if (ed.getMetricDefinition() == null) {
+ throw new IllegalArgumentException("Only metric entity comes here");
+ }
+ }
+
+ @Override
+ public void entityCreated(TaggedLogAPIEntity entity) throws Exception {
+ GenericMetricEntity e = (GenericMetricEntity)entity;
+ double[] value = e.getValue();
+ if (value != null) {
+ int count = value.length;
+ @SuppressWarnings("unused")
+ Class<?> cls = ed.getMetricDefinition().getSingleTimestampEntityClass();
+ for (int i = 0; i < count; i++) {
+ long ts = entity.getTimestamp() + i * ed.getMetricDefinition().getInterval();
+ // exclude those entity which is not within the time range in search condition. [start, end)
+ if (ts < start || ts >= end) {
+ continue;
+ }
+ single.setTimestamp(ts);
+ single.setTags(entity.getTags());
+ single.setValue(e.getValue()[i]);
+ for (EntityCreationListener l : listeners) {
+ l.entityCreated(single);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void readAsStream() throws Exception {
+ reader.register(this);
+ reader.readAsStream();
+ }
+
+ @Override
+ public long getLastTimestamp() {
+ return reader.getLastTimestamp();
+ }
+
+ @Override
+ public long getFirstTimestamp() {
+ return reader.getFirstTimestamp();
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricShadowEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricShadowEntity.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricShadowEntity.java
index acd1290..8ead7cd 100644
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricShadowEntity.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericMetricShadowEntity.java
@@ -22,13 +22,13 @@ import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
* just a shadow class to avoid dynamically create the class and instantiate using reflection
*/
public class GenericMetricShadowEntity extends TaggedLogAPIEntity {
- private double value;
+ private double value;
- public double getValue() {
- return value;
- }
+ public double getValue() {
+ return value;
+ }
- public void setValue(double value) {
- this.value = value;
- }
+ public void setValue(double value) {
+ this.value = value;
+ }
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java
index 6869c7c..97f538c 100644
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java
@@ -35,24 +35,27 @@ import java.util.Map;
*/
@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
-@XmlType(propOrder = {"success","exception","meta","type","obj"})
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@XmlType(propOrder = {
+ "success", "exception", "meta", "type", "obj"
+ })
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@JsonDeserialize(using = GenericServiceAPIResponseEntityDeserializer.class)
-@JsonIgnoreProperties(ignoreUnknown=true)
-public class GenericServiceAPIResponseEntity<T>{
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class GenericServiceAPIResponseEntity<T> {
/**
* Please use primitive type of value in meta as possible
*/
- private Map<String,Object> meta;
- private boolean success;
- private String exception;
+ private Map<String, Object> meta;
+ private boolean success;
+ private String exception;
private List<T> obj;
private Class<T> type;
- public GenericServiceAPIResponseEntity(){
+ public GenericServiceAPIResponseEntity() {
// default constructor
}
- public GenericServiceAPIResponseEntity(Class<T> type){
+
+ public GenericServiceAPIResponseEntity(Class<T> type) {
this.setType(type);
}
@@ -72,7 +75,7 @@ public class GenericServiceAPIResponseEntity<T>{
this.obj = obj;
}
- public void setObj(List<T> obj,Class<T> type) {
+ public void setObj(List<T> obj, Class<T> type) {
this.setObj(obj);
this.setType(type);
}
@@ -85,10 +88,10 @@ public class GenericServiceAPIResponseEntity<T>{
* Set the first object's class as type
*/
@SuppressWarnings("unused")
- public void setTypeByObj(){
- for(T t:this.obj){
- if(this.type == null && t!=null){
- this.type = (Class<T>) t.getClass();
+ public void setTypeByObj() {
+ for (T t : this.obj) {
+ if (this.type == null && t != null) {
+ this.type = (Class<T>)t.getClass();
}
}
}
@@ -102,17 +105,19 @@ public class GenericServiceAPIResponseEntity<T>{
this.type = type;
}
- public boolean isSuccess() {
- return success;
- }
- public void setSuccess(boolean success) {
- this.success = success;
- }
- public String getException() {
- return exception;
- }
-
- public void setException(Exception exceptionObj){
+ public boolean isSuccess() {
+ return success;
+ }
+
+ public void setSuccess(boolean success) {
+ this.success = success;
+ }
+
+ public String getException() {
+ return exception;
+ }
+
+ public void setException(Exception exceptionObj) {
this.exception = EagleExceptionWrapper.wrap(exceptionObj);
}
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java
index 836295b..8ccb43a 100644
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java
@@ -30,57 +30,61 @@ import java.io.IOException;
import java.util.*;
/**
- * @since 3/18/15
+ * @since 3/18/15.
*/
-public class GenericServiceAPIResponseEntityDeserializer extends JsonDeserializer<GenericServiceAPIResponseEntity> {
- private final static String META_FIELD="meta";
- private final static String SUCCESS_FIELD="success";
- private final static String EXCEPTION_FIELD="exception";
- private final static String OBJ_FIELD="obj";
- private final static String TYPE_FIELD="type";
+public class GenericServiceAPIResponseEntityDeserializer
+ extends JsonDeserializer<GenericServiceAPIResponseEntity> {
+ private static final String META_FIELD = "meta";
+ private static final String SUCCESS_FIELD = "success";
+ private static final String EXCEPTION_FIELD = "exception";
+ private static final String OBJ_FIELD = "obj";
+ private static final String TYPE_FIELD = "type";
@Override
- public GenericServiceAPIResponseEntity deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JsonProcessingException {
+ public GenericServiceAPIResponseEntity deserialize(JsonParser jp, DeserializationContext ctxt)
+ throws IOException, JsonProcessingException {
GenericServiceAPIResponseEntity entity = new GenericServiceAPIResponseEntity();
ObjectCodec objectCodec = jp.getCodec();
JsonNode rootNode = jp.getCodec().readTree(jp);
- if(rootNode.isObject()){
- Iterator<Map.Entry<String,JsonNode>> fields = rootNode.fields();
+ if (rootNode.isObject()) {
+ Iterator<Map.Entry<String, JsonNode>> fields = rootNode.fields();
JsonNode objNode = null;
- while(fields.hasNext()){
- Map.Entry<String,JsonNode> field = fields.next();
- if (META_FIELD.equals(field.getKey()) && field.getValue() != null)
+ while (fields.hasNext()) {
+ Map.Entry<String, JsonNode> field = fields.next();
+ if (META_FIELD.equals(field.getKey()) && field.getValue() != null) {
entity.setMeta(objectCodec.readValue(field.getValue().traverse(), Map.class));
- else if(SUCCESS_FIELD.equals(field.getKey()) && field.getValue() != null){
+ } else if (SUCCESS_FIELD.equals(field.getKey()) && field.getValue() != null) {
entity.setSuccess(field.getValue().booleanValue());
- }else if(EXCEPTION_FIELD.equals(field.getKey()) && field.getValue() != null){
+ } else if (EXCEPTION_FIELD.equals(field.getKey()) && field.getValue() != null) {
entity.setException(new Exception(field.getValue().textValue()));
- }else if(TYPE_FIELD.endsWith(field.getKey()) && field.getValue() != null){
- Preconditions.checkNotNull(field.getValue().textValue(),"Response type class is null");
+ } else if (TYPE_FIELD.endsWith(field.getKey()) && field.getValue() != null) {
+ Preconditions.checkNotNull(field.getValue().textValue(), "Response type class is null");
try {
entity.setType(Class.forName(field.getValue().textValue()));
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
- }else if(OBJ_FIELD.equals(field.getKey()) && field.getValue() != null){
+ } else if (OBJ_FIELD.equals(field.getKey()) && field.getValue() != null) {
objNode = field.getValue();
}
}
- if(objNode!=null) {
- JavaType collectionType=null;
+ if (objNode != null) {
+ JavaType collectionType = null;
if (entity.getType() != null) {
- collectionType = TypeFactory.defaultInstance().constructCollectionType(LinkedList.class, entity.getType());
- }else{
- collectionType = TypeFactory.defaultInstance().constructCollectionType(LinkedList.class, Map.class);
+ collectionType = TypeFactory.defaultInstance().constructCollectionType(LinkedList.class,
+ entity.getType());
+ } else {
+ collectionType = TypeFactory.defaultInstance().constructCollectionType(LinkedList.class,
+ Map.class);
}
List obj = objectCodec.readValue(objNode.traverse(), collectionType);
entity.setObj(obj);
}
- }else{
+ } else {
throw new IOException("root node is not object");
}
return entity;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseInternalLogHelper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseInternalLogHelper.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseInternalLogHelper.java
index 7a38033..32f382b 100755
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseInternalLogHelper.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseInternalLogHelper.java
@@ -30,216 +30,232 @@ import org.slf4j.LoggerFactory;
import java.util.*;
public class HBaseInternalLogHelper {
- private final static Logger LOG = LoggerFactory.getLogger(HBaseInternalLogHelper.class);
-
- private static final EntitySerDeserializer ENTITY_SERDESER = new EntitySerDeserializer();
-
- /**
- *
- * @param ed
- * @param r
- * @param qualifiers if null, return all qualifiers defined in ed
- * @return
- */
- public static InternalLog parse(EntityDefinition ed, Result r, byte[][] qualifiers) {
- final byte[] row = r.getRow();
- // skip the first 4 bytes : prefix
- final int offset = (ed.getPartitions() == null) ? (4) : (4 + ed.getPartitions().length * 4);
- long timestamp = ByteUtil.bytesToLong(row, offset);
- // reverse timestamp
- timestamp = Long.MAX_VALUE - timestamp;
- final byte[] family = ed.getColumnFamily().getBytes();
- final Map<String, byte[]> allQualifierValues = new HashMap<String, byte[]>();
-
- if (qualifiers != null) {
- int count = qualifiers.length;
- final byte[][] values = new byte[count][];
- for (int i = 0; i < count; i++) {
- // TODO if returned value is null, it means no this column for this row, so why set null to the object?
- values[i] = r.getValue(family, qualifiers[i]);
- allQualifierValues.put(new String(qualifiers[i]), values[i]);
- }
- }else{
- // return all qualifiers
- for(KeyValue kv:r.list()){
- byte[] qualifier = kv.getQualifier();
- byte[] value = kv.getValue();
- allQualifierValues.put(new String(qualifier),value);
- }
- }
- final InternalLog log = buildObject(ed, row, timestamp, allQualifierValues);
- return log;
- }
-
- /**
- *
- * @param ed
- * @param row
- * @param timestamp
- * @param allQualifierValues <code>Map < Qualifier name (not display name),Value in bytes array ></code>
- * @return
- */
- public static InternalLog buildObject(EntityDefinition ed, byte[] row, long timestamp, Map<String, byte[]> allQualifierValues) {
- InternalLog log = new InternalLog();
- String myRow = EagleBase64Wrapper.encodeByteArray2URLSafeString(row);
- log.setEncodedRowkey(myRow);
- log.setPrefix(ed.getPrefix());
- log.setTimestamp(timestamp);
-
- Map<String, byte[]> logQualifierValues = new HashMap<String, byte[]>();
- Map<String, String> logTags = new HashMap<String, String>();
- Map<String, Object> extra = null;
-
- Map<String,Double> doubleMap = null;
- // handle with metric
- boolean isMetricEntity = GenericMetricEntity.GENERIC_METRIC_SERVICE.equals(ed.getService());
- double[] metricValueArray = null;
-
- for (Map.Entry<String, byte[]> entry : allQualifierValues.entrySet()) {
- if (ed.isTag(entry.getKey())) {
- if (entry.getValue() != null) {
- logTags.put(entry.getKey(), new String(entry.getValue()));
- }else if (TokenConstant.isExpression(entry.getKey())){
- if(doubleMap == null) doubleMap = EntityQualifierUtils.bytesMapToDoubleMap(allQualifierValues, ed);
- // Caculate expression based fields
- String expression = TokenConstant.parseExpressionContent(entry.getKey());
- if (extra == null) extra = new HashMap<String, Object>();
-
- // Evaluation expression as output based on entity
- // -----------------------------------------------
- // 1) Firstly, check whether is metric entity and expression requires value and also value is not number (i.e. double[])
- // 2) Treat all required fields as double, if not number, then set result as NaN
-
- try {
- ExpressionParser parser = ExpressionParser.parse(expression);
- boolean isRequiringValue = parser.getDependentFields().contains(GenericMetricEntity.VALUE_FIELD);
-
- if(isMetricEntity && isRequiringValue && doubleMap.get(GenericMetricEntity.VALUE_FIELD)!=null
- && Double.isNaN(doubleMap.get(GenericMetricEntity.VALUE_FIELD))) // EntityQualifierUtils will convert non-number field into Double.NaN
- {
- // if dependent fields require "value"
- // and value exists but value's type is double[] instead of double
-
- // handle with metric value array based expression
- // lazily extract metric value as double array if required
- if(metricValueArray == null){
- // if(allQualifierValues.containsKey(GenericMetricEntity.VALUE_FIELD)){
- Qualifier qualifier = ed.getDisplayNameMap().get(GenericMetricEntity.VALUE_FIELD);
- EntitySerDeser serDeser = qualifier.getSerDeser();
- if(serDeser instanceof DoubleArraySerDeser){
- byte[] value = allQualifierValues.get(qualifier.getQualifierName());
- if(value !=null ) metricValueArray = (double[]) serDeser.deserialize(value);
- }
- // }
- }
-
- if(metricValueArray!=null){
- double[] resultBucket = new double[metricValueArray.length];
- Map<String, Double> _doubleMap = new HashMap<String,Double>(doubleMap);
- _doubleMap.remove(entry.getKey());
- for(int i=0;i< resultBucket.length;i++) {
- _doubleMap.put(GenericMetricEntity.VALUE_FIELD, metricValueArray[i]);
- resultBucket[i]= parser.eval(_doubleMap);
- }
- extra.put(expression,resultBucket);
- }else{
- LOG.warn("Failed convert metric value into double[] type which is required by expression: "+expression);
- // if require value in double[] is NaN
- double value = parser.eval(doubleMap);
- extra.put(expression, value);
- }
- }else {
- double value = parser.eval(doubleMap);
- extra.put(expression, value);
- // LOG.info("DEBUG: "+entry.getKey()+" = "+ value);
- }
- } catch (Exception e) {
- LOG.error("Failed to eval expression "+expression+", exception: "+e.getMessage(),e);
- }
- }
- } else {
- logQualifierValues.put(entry.getKey(),entry.getValue());
- }
- }
- log.setQualifierValues(logQualifierValues);
- log.setTags(logTags);
- log.setExtraValues(extra);
- return log;
- }
-
- public static TaggedLogAPIEntity buildEntity(InternalLog log, EntityDefinition entityDef) throws Exception {
- Map<String, byte[]> qualifierValues = log.getQualifierValues();
- TaggedLogAPIEntity entity = ENTITY_SERDESER.readValue(qualifierValues, entityDef);
- if (entity.getTags() == null && log.getTags() != null) {
- entity.setTags(log.getTags());
- }
- entity.setExp(log.getExtraValues());
- entity.setTimestamp(log.getTimestamp());
- entity.setEncodedRowkey(log.getEncodedRowkey());
- entity.setPrefix(log.getPrefix());
- return entity;
- }
-
- public static List<TaggedLogAPIEntity> buildEntities(List<InternalLog> logs, EntityDefinition entityDef) throws Exception {
- final List<TaggedLogAPIEntity> result = new ArrayList<TaggedLogAPIEntity>(logs.size());
- for (InternalLog log : logs) {
- result.add(buildEntity(log, entityDef));
- }
- return result;
- }
-
- public static byte[][] getOutputQualifiers(EntityDefinition entityDef, List<String> outputFields) {
- final byte[][] result = new byte[outputFields.size()][];
- int index = 0;
- for(String field : outputFields){
- // convert displayName to qualifierName
- Qualifier q = entityDef.getDisplayNameMap().get(field);
- if(q == null){ // for tag case
- result[index++] = field.getBytes();
- }else{ // for qualifier case
- result[index++] = q.getQualifierName().getBytes();
- }
- }
- return result;
- }
-
- public static InternalLog convertToInternalLog(TaggedLogAPIEntity entity, EntityDefinition entityDef) throws Exception {
- final InternalLog log = new InternalLog();
- final Map<String, String> inputTags = entity.getTags();
- final Map<String, String> tags = new TreeMap<String, String>();
- if(inputTags!=null) {
- for (Map.Entry<String, String> entry : inputTags.entrySet()) {
- tags.put(entry.getKey(), entry.getValue());
- }
- }
- log.setTags(tags);
- if(entityDef.isTimeSeries()){
- log.setTimestamp(entity.getTimestamp());
- }else{
- log.setTimestamp(EntityConstants.FIXED_WRITE_TIMESTAMP); // set timestamp to MAX, then actually stored 0
- }
-
- // For Metric entity, prefix is populated along with entity instead of EntityDefinition
- if(entity.getPrefix() != null && !entity.getPrefix().isEmpty()){
- log.setPrefix(entity.getPrefix());
- }else{
- log.setPrefix(entityDef.getPrefix());
- }
-
- log.setPartitions(entityDef.getPartitions());
- EntitySerDeserializer des = new EntitySerDeserializer();
- log.setQualifierValues(des.writeValue(entity, entityDef));
-
- final IndexDefinition[] indexDefs = entityDef.getIndexes();
- if (indexDefs != null) {
- final List<byte[]> indexRowkeys = new ArrayList<byte[]>();
- for (int i = 0; i < indexDefs.length; ++i) {
- final IndexDefinition indexDef = indexDefs[i];
- final byte[] indexRowkey = indexDef.generateIndexRowkey(entity);
- indexRowkeys.add(indexRowkey);
- }
- log.setIndexRowkeys(indexRowkeys);
- }
- return log;
- }
+ private static final Logger LOG = LoggerFactory.getLogger(HBaseInternalLogHelper.class);
+
+ private static final EntitySerDeserializer ENTITY_SERDESER = new EntitySerDeserializer();
+
+ /**
+ * @param ed
+ * @param r
+ * @param qualifiers if null, return all qualifiers defined in ed
+ * @return
+ */
+ public static InternalLog parse(EntityDefinition ed, Result r, byte[][] qualifiers) {
+ final byte[] row = r.getRow();
+ // skip the first 4 bytes : prefix
+ final int offset = (ed.getPartitions() == null) ? (4) : (4 + ed.getPartitions().length * 4);
+ long timestamp = ByteUtil.bytesToLong(row, offset);
+ // reverse timestamp
+ timestamp = Long.MAX_VALUE - timestamp;
+ final byte[] family = ed.getColumnFamily().getBytes();
+ final Map<String, byte[]> allQualifierValues = new HashMap<String, byte[]>();
+
+ if (qualifiers != null) {
+ int count = qualifiers.length;
+ final byte[][] values = new byte[count][];
+ for (int i = 0; i < count; i++) {
+ // TODO if returned value is null, it means no this column for this row, so why set null to
+ // the object?
+ values[i] = r.getValue(family, qualifiers[i]);
+ allQualifierValues.put(new String(qualifiers[i]), values[i]);
+ }
+ } else {
+ // return all qualifiers
+ for (KeyValue kv : r.list()) {
+ byte[] qualifier = kv.getQualifier();
+ byte[] value = kv.getValue();
+ allQualifierValues.put(new String(qualifier), value);
+ }
+ }
+ final InternalLog log = buildObject(ed, row, timestamp, allQualifierValues);
+ return log;
+ }
+
+ /**
+ * @param ed
+ * @param row
+ * @param timestamp
+ * @param allQualifierValues
+ * <code>Map < Qualifier name (not display name),Value in bytes array ></code>
+ * @return
+ */
+ public static InternalLog buildObject(EntityDefinition ed, byte[] row, long timestamp,
+ Map<String, byte[]> allQualifierValues) {
+ InternalLog log = new InternalLog();
+ String myRow = EagleBase64Wrapper.encodeByteArray2URLSafeString(row);
+ log.setEncodedRowkey(myRow);
+ log.setPrefix(ed.getPrefix());
+ log.setTimestamp(timestamp);
+
+ Map<String, byte[]> logQualifierValues = new HashMap<String, byte[]>();
+ Map<String, String> logTags = new HashMap<String, String>();
+ Map<String, Object> extra = null;
+
+ Map<String, Double> doubleMap = null;
+ // handle with metric
+ boolean isMetricEntity = GenericMetricEntity.GENERIC_METRIC_SERVICE.equals(ed.getService());
+ double[] metricValueArray = null;
+
+ for (Map.Entry<String, byte[]> entry : allQualifierValues.entrySet()) {
+ if (ed.isTag(entry.getKey())) {
+ if (entry.getValue() != null) {
+ logTags.put(entry.getKey(), new String(entry.getValue()));
+ } else if (TokenConstant.isExpression(entry.getKey())) {
+ if (doubleMap == null) {
+ doubleMap = EntityQualifierUtils.bytesMapToDoubleMap(allQualifierValues, ed);
+ }
+ // Caculate expression based fields
+ String expression = TokenConstant.parseExpressionContent(entry.getKey());
+ if (extra == null) {
+ extra = new HashMap<String, Object>();
+ }
+
+ // Evaluation expression as output based on entity
+ // -----------------------------------------------
+ // 1) Firstly, check whether is metric entity and expression requires value and also value
+ // is not number (i.e. double[])
+ // 2) Treat all required fields as double, if not number, then set result as NaN
+
+ try {
+ ExpressionParser parser = ExpressionParser.parse(expression);
+ boolean isRequiringValue = parser.getDependentFields()
+ .contains(GenericMetricEntity.VALUE_FIELD);
+
+ if (isMetricEntity && isRequiringValue
+ && doubleMap.get(GenericMetricEntity.VALUE_FIELD) != null
+ && Double.isNaN(doubleMap.get(GenericMetricEntity.VALUE_FIELD))) {
+ // EntityQualifierUtils will convert non-number field into Double.NaN
+ // if dependent fields require "value"
+ // and value exists but value's type is double[] instead of double
+
+ // handle with metric value array based expression
+ // lazily extract metric value as double array if required
+ if (metricValueArray == null) {
+ // if(allQualifierValues.containsKey(GenericMetricEntity.VALUE_FIELD)){
+ Qualifier qualifier = ed.getDisplayNameMap()
+ .get(GenericMetricEntity.VALUE_FIELD);
+ EntitySerDeser serDeser = qualifier.getSerDeser();
+ if (serDeser instanceof DoubleArraySerDeser) {
+ byte[] value = allQualifierValues.get(qualifier.getQualifierName());
+ if (value != null) {
+ metricValueArray = (double[])serDeser.deserialize(value);
+ }
+ }
+ // }
+ }
+
+ if (metricValueArray != null) {
+ double[] resultBucket = new double[metricValueArray.length];
+ Map<String, Double> _doubleMap = new HashMap<String, Double>(doubleMap);
+ _doubleMap.remove(entry.getKey());
+ for (int i = 0; i < resultBucket.length; i++) {
+ _doubleMap.put(GenericMetricEntity.VALUE_FIELD, metricValueArray[i]);
+ resultBucket[i] = parser.eval(_doubleMap);
+ }
+ extra.put(expression, resultBucket);
+ } else {
+ LOG.warn("Failed convert metric value into double[] type which is required by expression: "
+ + expression);
+ // if require value in double[] is NaN
+ double value = parser.eval(doubleMap);
+ extra.put(expression, value);
+ }
+ } else {
+ double value = parser.eval(doubleMap);
+ extra.put(expression, value);
+ // LOG.info("DEBUG: "+entry.getKey()+" = "+ value);
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to eval expression " + expression + ", exception: "
+ + e.getMessage(), e);
+ }
+ }
+ } else {
+ logQualifierValues.put(entry.getKey(), entry.getValue());
+ }
+ }
+ log.setQualifierValues(logQualifierValues);
+ log.setTags(logTags);
+ log.setExtraValues(extra);
+ return log;
+ }
+
+ public static TaggedLogAPIEntity buildEntity(InternalLog log, EntityDefinition entityDef)
+ throws Exception {
+ Map<String, byte[]> qualifierValues = log.getQualifierValues();
+ TaggedLogAPIEntity entity = ENTITY_SERDESER.readValue(qualifierValues, entityDef);
+ if (entity.getTags() == null && log.getTags() != null) {
+ entity.setTags(log.getTags());
+ }
+ entity.setExp(log.getExtraValues());
+ entity.setTimestamp(log.getTimestamp());
+ entity.setEncodedRowkey(log.getEncodedRowkey());
+ entity.setPrefix(log.getPrefix());
+ return entity;
+ }
+
+ public static List<TaggedLogAPIEntity> buildEntities(List<InternalLog> logs, EntityDefinition entityDef)
+ throws Exception {
+ final List<TaggedLogAPIEntity> result = new ArrayList<TaggedLogAPIEntity>(logs.size());
+ for (InternalLog log : logs) {
+ result.add(buildEntity(log, entityDef));
+ }
+ return result;
+ }
+
+ public static byte[][] getOutputQualifiers(EntityDefinition entityDef, List<String> outputFields) {
+ final byte[][] result = new byte[outputFields.size()][];
+ int index = 0;
+ for (String field : outputFields) {
+ // convert displayName to qualifierName
+ Qualifier q = entityDef.getDisplayNameMap().get(field);
+ if (q == null) { // for tag case
+ result[index++] = field.getBytes();
+ } else { // for qualifier case
+ result[index++] = q.getQualifierName().getBytes();
+ }
+ }
+ return result;
+ }
+
+ public static InternalLog convertToInternalLog(TaggedLogAPIEntity entity, EntityDefinition entityDef)
+ throws Exception {
+ final InternalLog log = new InternalLog();
+ final Map<String, String> inputTags = entity.getTags();
+ final Map<String, String> tags = new TreeMap<String, String>();
+ if (inputTags != null) {
+ for (Map.Entry<String, String> entry : inputTags.entrySet()) {
+ tags.put(entry.getKey(), entry.getValue());
+ }
+ }
+ log.setTags(tags);
+ if (entityDef.isTimeSeries()) {
+ log.setTimestamp(entity.getTimestamp());
+ } else {
+ log.setTimestamp(EntityConstants.FIXED_WRITE_TIMESTAMP); // set timestamp to MAX, then actually stored 0
+ }
+
+ // For Metric entity, prefix is populated along with entity instead of EntityDefinition
+ if (entity.getPrefix() != null && !entity.getPrefix().isEmpty()) {
+ log.setPrefix(entity.getPrefix());
+ } else {
+ log.setPrefix(entityDef.getPrefix());
+ }
+
+ log.setPartitions(entityDef.getPartitions());
+ EntitySerDeserializer des = new EntitySerDeserializer();
+ log.setQualifierValues(des.writeValue(entity, entityDef));
+
+ final IndexDefinition[] indexDefs = entityDef.getIndexes();
+ if (indexDefs != null) {
+ final List<byte[]> indexRowkeys = new ArrayList<byte[]>();
+ for (int i = 0; i < indexDefs.length; ++i) {
+ final IndexDefinition indexDef = indexDefs[i];
+ final byte[] indexRowkey = indexDef.generateIndexRowkey(entity);
+ indexRowkeys.add(indexRowkey);
+ }
+ log.setIndexRowkeys(indexRowkeys);
+ }
+ return log;
+ }
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogReader2.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogReader2.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogReader2.java
index c8b9a33..d5c8e2c 100755
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogReader2.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogReader2.java
@@ -28,59 +28,62 @@ import java.util.Date;
import java.util.List;
public class HBaseLogReader2 extends AbstractHBaseLogReader<InternalLog> {
- protected ResultScanner rs;
+ protected ResultScanner rs;
- public HBaseLogReader2(EntityDefinition ed, List<String> partitions, Date startTime, Date endTime, Filter filter, String lastScanKey, byte[][] outputQualifiers) {
- super(ed, partitions, startTime, endTime, filter, lastScanKey, outputQualifiers);
- }
+ public HBaseLogReader2(EntityDefinition ed, List<String> partitions, Date startTime, Date endTime,
+ Filter filter, String lastScanKey, byte[][] outputQualifiers) {
+ super(ed, partitions, startTime, endTime, filter, lastScanKey, outputQualifiers);
+ }
- /**
- * This constructor supports partition.
- *
- * @param ed entity definition
- * @param partitions partition values, which is sorted in partition definition order. TODO: in future we need to support
- * multiple values for one partition field
- * @param startTime start time of the query
- * @param endTime end time of the query
- * @param filter filter for the hbase scan
- * @param lastScanKey the key of last scan
- * @param outputQualifiers the bytes of output qualifier names
- * @param prefix can be populated from outside world specifically for generic metric reader
- */
- public HBaseLogReader2(EntityDefinition ed, List<String> partitions, Date startTime, Date endTime, Filter filter, String lastScanKey, byte[][] outputQualifiers, String prefix) {
- super(ed, partitions, startTime, endTime, filter, lastScanKey, outputQualifiers, prefix);
- }
+ /**
+ * This constructor supports partition.
+ *
+ * @param ed entity definition
+ * @param partitions partition values, which is sorted in partition definition order. TODO: in future we
+ * need to support multiple values for one partition field
+ * @param startTime start time of the query
+ * @param endTime end time of the query
+ * @param filter filter for the hbase scan
+ * @param lastScanKey the key of last scan
+ * @param outputQualifiers the bytes of output qualifier names
+ * @param prefix can be populated from outside world specifically for generic metric reader
+ */
+ public HBaseLogReader2(EntityDefinition ed, List<String> partitions, Date startTime, Date endTime,
+ Filter filter, String lastScanKey, byte[][] outputQualifiers, String prefix) {
+ super(ed, partitions, startTime, endTime, filter, lastScanKey, outputQualifiers, prefix);
+ }
- @Override
- protected void onOpen(HTableInterface tbl, Scan scan) throws IOException {
- rs = tbl.getScanner(scan);
- }
+ @Override
+ protected void onOpen(HTableInterface tbl, Scan scan) throws IOException {
+ rs = tbl.getScanner(scan);
+ }
- /**
- * <h2>Close:</h2>
- * 1. Call super.close(): release current table connection <br></br>
- * 2. Close Scanner<br></br>
- *
- * @throws IOException
- */
- @Override
- public void close() throws IOException {
- super.close();
- if(rs != null){
- rs.close();
- }
- }
+ /**
+ * <h2>Close:</h2> 1. Call super.close(): release current table connection <br>
+ * <br>
+ * 2. Close Scanner<br>
+ * <br>
+ *
+ * @throws IOException
+ */
+ @Override
+ public void close() throws IOException {
+ super.close();
+ if (rs != null) {
+ rs.close();
+ }
+ }
- @Override
- public InternalLog read() throws IOException {
- if (rs == null)
- throw new IllegalArgumentException(
- "ResultScanner must be initialized before reading");
- InternalLog t = null;
- Result r = rs.next();
- if (r != null) {
- t = HBaseInternalLogHelper.parse(_ed, r, qualifiers);
- }
- return t;
- }
+ @Override
+ public InternalLog read() throws IOException {
+ if (rs == null) {
+ throw new IllegalArgumentException("ResultScanner must be initialized before reading");
+ }
+ InternalLog t = null;
+ Result r = rs.next();
+ if (r != null) {
+ t = HBaseInternalLogHelper.parse(ed, r, qualifiers);
+ }
+ return t;
+ }
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogWriter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogWriter.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogWriter.java
index 059ee7f..1cf23b6 100644
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogWriter.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/HBaseLogWriter.java
@@ -29,124 +29,125 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HBaseLogWriter implements LogWriter {
- private static Logger LOG = LoggerFactory.getLogger(HBaseLogWriter.class);
- private static byte[] EMPTY_INDEX_QUALIFER_VALUE = "".getBytes();
-
- private HTableInterface tbl;
- private String table;
- private String columnFamily;
-
- public HBaseLogWriter(String table, String columnFamily) {
- // TODO assert for non-null of table and columnFamily
- this.table = table;
- this.columnFamily = columnFamily;
- }
-
- @Override
- public void open() throws IOException {
- try{
- tbl = EagleConfigFactory.load().getHTable(this.table);
-// LOGGER.info("HBase table " + table + " audo reflush is " + (tbl.isAutoFlush() ? "enabled" : "disabled"));
- }catch(Exception ex){
- LOG.error("Cannot create htable", ex);
- throw new IOException(ex);
- }
- }
-
- @Override
- public void close() throws IOException {
- if(tbl != null){
- new HTableFactory().releaseHTableInterface(tbl);
- }
- }
-
- @Override
- public void flush() throws IOException {
- tbl.flushCommits();
- }
-
- protected void populateColumnValues(Put p, InternalLog log){
- Map<String, byte[]> qualifierValues = log.getQualifierValues();
- // iterate all qualifierValues
- for(Map.Entry<String, byte[]> entry : qualifierValues.entrySet()){
- p.add(columnFamily.getBytes(), entry.getKey().getBytes(), entry.getValue());
- }
-
- Map<String, String> tags = log.getTags();
- // iterate all tags, each tag will be stored as a column qualifier
- if(tags != null){
- for(Map.Entry<String, String> entry : tags.entrySet()){
- // TODO need a consistent handling of null values
- if(entry.getValue() != null)
- p.add(columnFamily.getBytes(), entry.getKey().getBytes(), entry.getValue().getBytes());
- }
- }
- }
-
- /**
- * TODO need think about if multi-PUT is necessary, by checking if autoFlush works
- */
- @Override
- public byte[] write(InternalLog log) throws IOException{
- final byte[] rowkey = RowkeyBuilder.buildRowkey(log);
- final Put p = new Put(rowkey);
- populateColumnValues(p, log);
- tbl.put(p);
- final List<byte[]> indexRowkeys = log.getIndexRowkeys();
- if (indexRowkeys != null) {
- writeIndexes(rowkey, indexRowkeys);
- }
- return rowkey;
- }
-
- /**
- * TODO need think about if multi-PUT is necessary, by checking if autoFlush works
- */
- public List<byte[]> write(List<InternalLog> logs) throws IOException{
- final List<Put> puts = new ArrayList<Put>(logs.size());
- final List<byte[]> result = new ArrayList<byte[]>(logs.size());
- for (InternalLog log : logs) {
- final byte[] rowkey = RowkeyBuilder.buildRowkey(log);
- final Put p = new Put(rowkey);
- populateColumnValues(p, log);
- puts.add(p);
- final List<byte[]> indexRowkeys = log.getIndexRowkeys();
- if (indexRowkeys != null) {
- writeIndexes(rowkey, indexRowkeys, puts);
- }
- result.add(rowkey);
- }
- tbl.put(puts);
- return result;
- }
-
- @Override
- public void updateByRowkey(byte[] rowkey, InternalLog log) throws IOException{
- Put p = new Put(rowkey);
- populateColumnValues(p, log);
- tbl.put(p);
- final List<byte[]> indexRowkeys = log.getIndexRowkeys();
- if (indexRowkeys != null) {
- writeIndexes(rowkey, indexRowkeys);
- }
- }
-
- private void writeIndexes(byte[] rowkey, List<byte[]> indexRowkeys) throws IOException {
- for (byte[] indexRowkey : indexRowkeys) {
- Put p = new Put(indexRowkey);
- p.add(columnFamily.getBytes(), rowkey, EMPTY_INDEX_QUALIFER_VALUE);
- tbl.put(p);
- }
- }
-
- private void writeIndexes(byte[] rowkey, List<byte[]> indexRowkeys, List<Put> puts) throws IOException {
- for (byte[] indexRowkey : indexRowkeys) {
- Put p = new Put(indexRowkey);
- p.add(columnFamily.getBytes(), rowkey, EMPTY_INDEX_QUALIFER_VALUE);
- puts.add(p);
-// tbl.put(p);
- }
- }
-
-
+ private static Logger LOG = LoggerFactory.getLogger(HBaseLogWriter.class);
+ private static byte[] EMPTY_INDEX_QUALIFER_VALUE = "".getBytes();
+
+ private HTableInterface tbl;
+ private String table;
+ private String columnFamily;
+
+ public HBaseLogWriter(String table, String columnFamily) {
+ // TODO assert for non-null of table and columnFamily
+ this.table = table;
+ this.columnFamily = columnFamily;
+ }
+
+ @Override
+ public void open() throws IOException {
+ try {
+ tbl = EagleConfigFactory.load().getHTable(this.table);
+ // LOGGER.info("HBase table " + table + " audo reflush is " + (tbl.isAutoFlush() ? "enabled" :
+ // "disabled"));
+ } catch (Exception ex) {
+ LOG.error("Cannot create htable", ex);
+ throw new IOException(ex);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (tbl != null) {
+ new HTableFactory().releaseHTableInterface(tbl);
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ tbl.flushCommits();
+ }
+
+ protected void populateColumnValues(Put p, InternalLog log) {
+ Map<String, byte[]> qualifierValues = log.getQualifierValues();
+ // iterate all qualifierValues
+ for (Map.Entry<String, byte[]> entry : qualifierValues.entrySet()) {
+ p.add(columnFamily.getBytes(), entry.getKey().getBytes(), entry.getValue());
+ }
+
+ Map<String, String> tags = log.getTags();
+ // iterate all tags, each tag will be stored as a column qualifier
+ if (tags != null) {
+ for (Map.Entry<String, String> entry : tags.entrySet()) {
+ // TODO need a consistent handling of null values
+ if (entry.getValue() != null) {
+ p.add(columnFamily.getBytes(), entry.getKey().getBytes(), entry.getValue().getBytes());
+ }
+ }
+ }
+ }
+
+ /**
+ * TODO need think about if multi-PUT is necessary, by checking if autoFlush works
+ */
+ @Override
+ public byte[] write(InternalLog log) throws IOException {
+ final byte[] rowkey = RowkeyBuilder.buildRowkey(log);
+ final Put p = new Put(rowkey);
+ populateColumnValues(p, log);
+ tbl.put(p);
+ final List<byte[]> indexRowkeys = log.getIndexRowkeys();
+ if (indexRowkeys != null) {
+ writeIndexes(rowkey, indexRowkeys);
+ }
+ return rowkey;
+ }
+
+ /**
+ * TODO need think about if multi-PUT is necessary, by checking if autoFlush works
+ */
+ public List<byte[]> write(List<InternalLog> logs) throws IOException {
+ final List<Put> puts = new ArrayList<Put>(logs.size());
+ final List<byte[]> result = new ArrayList<byte[]>(logs.size());
+ for (InternalLog log : logs) {
+ final byte[] rowkey = RowkeyBuilder.buildRowkey(log);
+ final Put p = new Put(rowkey);
+ populateColumnValues(p, log);
+ puts.add(p);
+ final List<byte[]> indexRowkeys = log.getIndexRowkeys();
+ if (indexRowkeys != null) {
+ writeIndexes(rowkey, indexRowkeys, puts);
+ }
+ result.add(rowkey);
+ }
+ tbl.put(puts);
+ return result;
+ }
+
+ @Override
+ public void updateByRowkey(byte[] rowkey, InternalLog log) throws IOException {
+ Put p = new Put(rowkey);
+ populateColumnValues(p, log);
+ tbl.put(p);
+ final List<byte[]> indexRowkeys = log.getIndexRowkeys();
+ if (indexRowkeys != null) {
+ writeIndexes(rowkey, indexRowkeys);
+ }
+ }
+
+ private void writeIndexes(byte[] rowkey, List<byte[]> indexRowkeys) throws IOException {
+ for (byte[] indexRowkey : indexRowkeys) {
+ Put p = new Put(indexRowkey);
+ p.add(columnFamily.getBytes(), rowkey, EMPTY_INDEX_QUALIFER_VALUE);
+ tbl.put(p);
+ }
+ }
+
+ private void writeIndexes(byte[] rowkey, List<byte[]> indexRowkeys, List<Put> puts) throws IOException {
+ for (byte[] indexRowkey : indexRowkeys) {
+ Put p = new Put(indexRowkey);
+ p.add(columnFamily.getBytes(), rowkey, EMPTY_INDEX_QUALIFER_VALUE);
+ puts.add(p);
+ // tbl.put(p);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/InternalLog.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/InternalLog.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/InternalLog.java
index 8276640..066401f 100755
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/InternalLog.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/InternalLog.java
@@ -25,115 +25,134 @@ import java.util.Map;
* TODO we should decouple BaseLog during write time and BaseLog during read time
*/
public class InternalLog {
- private String encodedRowkey;
- private String prefix;
- private String[] partitions;
- private long timestamp;
- private Map<String, byte[]> qualifierValues;
-
- private Map<String,Object> extraValues;
- private Map<String, String> tags;
- private Map<String, List<String>> searchTags;
- private List<byte[]> indexRowkeys;
-
- public String getEncodedRowkey() {
- return encodedRowkey;
- }
-
- public void setEncodedRowkey(String encodedRowkey) {
- this.encodedRowkey = encodedRowkey;
- }
-
- public Map<String, byte[]> getQualifierValues() {
- return qualifierValues;
- }
- public void setQualifierValues(Map<String, byte[]> qualifierValues) {
- this.qualifierValues = qualifierValues;
- }
-
- public Map<String, List<String>> getSearchTags() {
- return searchTags;
- }
- public void setSearchTags(Map<String, List<String>> searchTags) {
- this.searchTags = searchTags;
- }
- public String getPrefix() {
- return prefix;
- }
- public void setPrefix(String prefix) {
- this.prefix = prefix;
- }
- public String[] getPartitions() {
- return partitions;
- }
- public void setPartitions(String[] partitions) {
- this.partitions = partitions;
- }
- public long getTimestamp() {
- return timestamp;
- }
- public void setTimestamp(long timestamp) {
- this.timestamp = timestamp;
- }
- public Map<String, String> getTags() {
- return tags;
- }
- public void setTags(Map<String, String> tags) {
- this.tags = tags;
- }
- public List<byte[]> getIndexRowkeys() {
- return indexRowkeys;
- }
- public void setIndexRowkeys(List<byte[]> indexRowkeys) {
- this.indexRowkeys = indexRowkeys;
- }
- public Map<String, Object> getExtraValues() { return extraValues; }
- public void setExtraValues(Map<String, Object> extraValues) { this.extraValues = extraValues; }
-
- public String toString(){
- StringBuffer sb = new StringBuffer();
- sb.append(prefix);
- sb.append("|");
- sb.append(DateTimeUtil.millisecondsToHumanDateWithMilliseconds(timestamp));
- sb.append("(");
- sb.append(timestamp);
- sb.append(")");
- sb.append("|searchTags:");
- if(searchTags != null){
- for(String tagkey : searchTags.keySet()){
- sb.append(tagkey);
- sb.append('=');
- List<String> tagValues = searchTags.get(tagkey);
- sb.append("(");
- for(String tagValue : tagValues){
- sb.append(tagValue);
- sb.append(",");
- }
- sb.append(")");
- sb.append(",");
- }
- }
- sb.append("|tags:");
- if(tags != null){
- for(Map.Entry<String, String> entry : tags.entrySet()){
- sb.append(entry.getKey());
- sb.append("=");
- sb.append(entry.getValue());
- sb.append(",");
- }
- }
- sb.append("|columns:");
- if(qualifierValues != null){
- for(String qualifier : qualifierValues.keySet()){
- byte[] value = qualifierValues.get(qualifier);
- sb.append(qualifier);
- sb.append("=");
- if(value != null){
- sb.append(new String(value));
- }
- sb.append(",");
- }
- }
- return sb.toString();
- }
+ private String encodedRowkey;
+ private String prefix;
+ private String[] partitions;
+ private long timestamp;
+ private Map<String, byte[]> qualifierValues;
+
+ private Map<String, Object> extraValues;
+ private Map<String, String> tags;
+ private Map<String, List<String>> searchTags;
+ private List<byte[]> indexRowkeys;
+
+ public String getEncodedRowkey() {
+ return encodedRowkey;
+ }
+
+ public void setEncodedRowkey(String encodedRowkey) {
+ this.encodedRowkey = encodedRowkey;
+ }
+
+ public Map<String, byte[]> getQualifierValues() {
+ return qualifierValues;
+ }
+
+ public void setQualifierValues(Map<String, byte[]> qualifierValues) {
+ this.qualifierValues = qualifierValues;
+ }
+
+ public Map<String, List<String>> getSearchTags() {
+ return searchTags;
+ }
+
+ public void setSearchTags(Map<String, List<String>> searchTags) {
+ this.searchTags = searchTags;
+ }
+
+ public String getPrefix() {
+ return prefix;
+ }
+
+ public void setPrefix(String prefix) {
+ this.prefix = prefix;
+ }
+
+ public String[] getPartitions() {
+ return partitions;
+ }
+
+ public void setPartitions(String[] partitions) {
+ this.partitions = partitions;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public Map<String, String> getTags() {
+ return tags;
+ }
+
+ public void setTags(Map<String, String> tags) {
+ this.tags = tags;
+ }
+
+ public List<byte[]> getIndexRowkeys() {
+ return indexRowkeys;
+ }
+
+ public void setIndexRowkeys(List<byte[]> indexRowkeys) {
+ this.indexRowkeys = indexRowkeys;
+ }
+
+ public Map<String, Object> getExtraValues() {
+ return extraValues;
+ }
+
+ public void setExtraValues(Map<String, Object> extraValues) {
+ this.extraValues = extraValues;
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ sb.append(prefix);
+ sb.append("|");
+ sb.append(DateTimeUtil.millisecondsToHumanDateWithMilliseconds(timestamp));
+ sb.append("(");
+ sb.append(timestamp);
+ sb.append(")");
+ sb.append("|searchTags:");
+ if (searchTags != null) {
+ for (String tagkey : searchTags.keySet()) {
+ sb.append(tagkey);
+ sb.append('=');
+ List<String> tagValues = searchTags.get(tagkey);
+ sb.append("(");
+ for (String tagValue : tagValues) {
+ sb.append(tagValue);
+ sb.append(",");
+ }
+ sb.append(")");
+ sb.append(",");
+ }
+ }
+ sb.append("|tags:");
+ if (tags != null) {
+ for (Map.Entry<String, String> entry : tags.entrySet()) {
+ sb.append(entry.getKey());
+ sb.append("=");
+ sb.append(entry.getValue());
+ sb.append(",");
+ }
+ }
+ sb.append("|columns:");
+ if (qualifierValues != null) {
+ for (String qualifier : qualifierValues.keySet()) {
+ byte[] value = qualifierValues.get(qualifier);
+ sb.append(qualifier);
+ sb.append("=");
+ if (value != null) {
+ sb.append(new String(value));
+ }
+ sb.append(",");
+ }
+ }
+ return sb.toString();
+ }
}