You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by tb...@apache.org on 2013/06/15 02:43:32 UTC

svn commit: r1493291 [1/2] - in /incubator/ambari/trunk/ambari-server/src: main/java/org/apache/ambari/server/controller/ganglia/ main/java/org/apache/ambari/server/controller/internal/ main/resources/ test/java/org/apache/ambari/server/controller/gang...

Author: tbeerbower
Date: Sat Jun 15 00:43:32 2013
New Revision: 1493291

URL: http://svn.apache.org/r1493291
Log:
Initial check in of FLUME metrics

Added:
    incubator/ambari/trunk/ambari-server/src/test/resources/flume_ganglia_data.txt
Modified:
    incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/ganglia/GangliaPropertyProvider.java
    incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractPropertyProvider.java
    incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BaseProvider.java
    incubator/ambari/trunk/ambari-server/src/main/resources/ganglia_properties.json
    incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/controller/ganglia/GangliaPropertyProviderTest.java

Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/ganglia/GangliaPropertyProvider.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/ganglia/GangliaPropertyProvider.java?rev=1493291&r1=1493290&r2=1493291&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/ganglia/GangliaPropertyProvider.java (original)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/ganglia/GangliaPropertyProvider.java Sat Jun 15 00:43:32 2013
@@ -29,6 +29,8 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 /**
  * Abstract property provider implementation for a Ganglia source.
@@ -51,16 +53,17 @@ public abstract class GangliaPropertyPro
   public static final Map<String, String> GANGLIA_CLUSTER_NAME_MAP = new HashMap<String, String>();
 
   static {
-    GANGLIA_CLUSTER_NAME_MAP.put("NAMENODE", "HDPNameNode");
-    GANGLIA_CLUSTER_NAME_MAP.put("DATANODE", "HDPSlaves");
-    GANGLIA_CLUSTER_NAME_MAP.put("JOBTRACKER", "HDPJobTracker");
-    GANGLIA_CLUSTER_NAME_MAP.put("TASKTRACKER", "HDPSlaves");
-    GANGLIA_CLUSTER_NAME_MAP.put("RESOURCEMANAGER", "HDPResourceManager");
-    GANGLIA_CLUSTER_NAME_MAP.put("NODEMANAGER", "HDPSlaves");
-    GANGLIA_CLUSTER_NAME_MAP.put("HISTORYSERVER", "HDPHistoryServer");
-    GANGLIA_CLUSTER_NAME_MAP.put("HBASE_MASTER", "HDPHBaseMaster");
-    GANGLIA_CLUSTER_NAME_MAP.put("HBASE_CLIENT", "HDPSlaves");
+    GANGLIA_CLUSTER_NAME_MAP.put("NAMENODE",           "HDPNameNode");
+    GANGLIA_CLUSTER_NAME_MAP.put("DATANODE",           "HDPSlaves");
+    GANGLIA_CLUSTER_NAME_MAP.put("JOBTRACKER",         "HDPJobTracker");
+    GANGLIA_CLUSTER_NAME_MAP.put("TASKTRACKER",        "HDPSlaves");
+    GANGLIA_CLUSTER_NAME_MAP.put("RESOURCEMANAGER",    "HDPResourceManager");
+    GANGLIA_CLUSTER_NAME_MAP.put("NODEMANAGER",        "HDPSlaves");
+    GANGLIA_CLUSTER_NAME_MAP.put("HISTORYSERVER",      "HDPHistoryServer");
+    GANGLIA_CLUSTER_NAME_MAP.put("HBASE_MASTER",       "HDPHBaseMaster");
+    GANGLIA_CLUSTER_NAME_MAP.put("HBASE_CLIENT",       "HDPSlaves");
     GANGLIA_CLUSTER_NAME_MAP.put("HBASE_REGIONSERVER", "HDPSlaves");
+    GANGLIA_CLUSTER_NAME_MAP.put("FLUME_SERVER",       "HDPSlaves");
   }
 
   protected final static Logger LOG =
@@ -208,7 +211,9 @@ public abstract class GangliaPropertyPro
             new ResourceKey(getHostName(resource), gangliaClusterName);
 
         for (String id : ids) {
-          Map<String, PropertyInfo> propertyInfoMap = getPropertyInfoMap(getComponentName(resource), id);
+          Map<String, PropertyInfo> propertyInfoMap = new HashMap<String, PropertyInfo>();
+
+          boolean containsRegExp = getPropertyInfoMap(getComponentName(resource), id, propertyInfoMap);
 
           for (Map.Entry<String, PropertyInfo> entry : propertyInfoMap.entrySet()) {
             String propertyId = entry.getKey();
@@ -219,7 +224,7 @@ public abstract class GangliaPropertyPro
             if ((temporalInfo == null && propertyInfo.isPointInTime()) || (temporalInfo != null && propertyInfo.isTemporal())) {
               RRDRequest rrdRequest = requests.get(temporalInfo);
               if (rrdRequest == null) {
-                rrdRequest = new RRDRequest(clusterName, temporalInfo);
+                rrdRequest = new RRDRequest(clusterName, temporalInfo, containsRegExp);
                 requests.put(temporalInfo, rrdRequest);
               }
               rrdRequest.putResource(key, resource);
@@ -355,11 +360,13 @@ public abstract class GangliaPropertyPro
     private final Map<String, Set<String>> metrics = new HashMap<String, Set<String>>();
     private final Set<String> clusterSet = new HashSet<String>();
     private final Set<String> hostSet = new HashSet<String>();
+    private final boolean containsRegExp;
 
 
-    private RRDRequest(String clusterName, TemporalInfo temporalInfo) {
+    private RRDRequest(String clusterName, TemporalInfo temporalInfo, boolean containsRegExp) {
       this.clusterName  = clusterName;
       this.temporalInfo = temporalInfo;
+      this.containsRegExp = containsRegExp;
     }
 
     public void putResource(ResourceKey key, Resource resource) {
@@ -392,7 +399,8 @@ public abstract class GangliaPropertyPro
      */
     public Collection<Resource> populateResources() throws SystemException {
 
-      String spec = getSpec(clusterName, clusterSet, hostSet, metrics.keySet(), temporalInfo);
+      String spec = getSpec(clusterName, clusterSet, hostSet,
+          containsRegExp ? Collections.<String>emptySet() : metrics.keySet(), temporalInfo);
       BufferedReader reader = null;
       try {
         reader = new BufferedReader(new InputStreamReader(
@@ -465,13 +473,42 @@ public abstract class GangliaPropertyPro
      * @param gangliaMetric  the Ganglia metrics
      */
     private void populateResource(Resource resource, GangliaMetric gangliaMetric) {
-      Set<String> propertyIdSet = metrics.get(gangliaMetric.getMetric_name());
+      String metric_name = gangliaMetric.getMetric_name();
+
+      Set<String> propertyIdSet = metrics.get(metric_name);
+      List<String> parameterList  = new LinkedList<String>();
+
+      if (propertyIdSet == null) {
+        for (Map.Entry<String, Set<String>> entry : metrics.entrySet()) {
+
+          String key = entry.getKey();
+
+          Pattern pattern = Pattern.compile(key);
+          Matcher matcher = pattern.matcher(metric_name);
+
+          if (matcher.matches()) {
+            propertyIdSet = entry.getValue();
+            // get parameters
+            for (int i = 0; i < matcher.groupCount(); ++i) {
+              parameterList.add(matcher.group(i + 1));
+            }
+            break;
+          }
+        }
+      }
       if (propertyIdSet != null) {
         Map<String, PropertyInfo> metricsMap = getComponentMetrics().get(getComponentName(resource));
         if (metricsMap != null) {
           for (String propertyId : propertyIdSet) {
             if (propertyId != null) {
               if (metricsMap.containsKey(propertyId)){
+                if (propertyId.matches(".*\\$\\d+.*")) {
+                  int i = 1;
+                  for (String param : parameterList) {
+                    propertyId = propertyId.replace("$" + i, param);
+                    ++i;
+                  }
+                }
                 resource.setProperty(propertyId, getValue(gangliaMetric, temporalInfo != null));
               }
             }

Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractPropertyProvider.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractPropertyProvider.java?rev=1493291&r1=1493290&r2=1493291&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractPropertyProvider.java (original)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractPropertyProvider.java Sat Jun 15 00:43:32 2013
@@ -21,7 +21,6 @@ package org.apache.ambari.server.control
 import org.apache.ambari.server.controller.spi.PropertyProvider;
 import org.apache.ambari.server.controller.utilities.PropertyHelper;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -73,25 +72,68 @@ public abstract class AbstractPropertyPr
    * @return a map of metrics
    */
   protected Map<String, PropertyInfo> getPropertyInfoMap(String componentName, String propertyId) {
+    Map<String, PropertyInfo> propertyInfoMap = new HashMap<String, PropertyInfo>();
+
+    getPropertyInfoMap(componentName, propertyId, propertyInfoMap);
+
+    return propertyInfoMap;
+  }
+
+  // TODO : added for flume and reg exp property ids... revisit.
+  protected boolean getPropertyInfoMap(String componentName, String propertyId, Map<String, PropertyInfo> propertyInfoMap) {
     Map<String, PropertyInfo> componentMetricMap = componentMetrics.get(componentName);
+
+    propertyInfoMap.clear();
+
     if (componentMetricMap == null) {
-      return Collections.emptyMap();
+      return false;
     }
 
     PropertyInfo propertyInfo = componentMetricMap.get(propertyId);
     if (propertyInfo != null) {
-      return Collections.singletonMap(propertyId, propertyInfo);
+      propertyInfoMap.put(propertyId, propertyInfo);
+      return false;
+    }
+
+    String regExpKey = getRegExpKey(propertyId);
+
+    if (regExpKey != null) {
+      propertyInfo = componentMetricMap.get(regExpKey);
+      if (propertyInfo != null) {
+        propertyInfoMap.put(regExpKey, propertyInfo);
+        return true;
+      }
     }
 
+    boolean containsRegExp = false;
+
     if (!propertyId.endsWith("/")){
       propertyId += "/";
     }
-    Map<String, PropertyInfo> propertyInfoMap = new HashMap<String, PropertyInfo>();
+
     for (Map.Entry<String, PropertyInfo> entry : componentMetricMap.entrySet()) {
       if (entry.getKey().startsWith(propertyId)) {
-        propertyInfoMap.put(entry.getKey(), entry.getValue());
+        String key = entry.getKey();
+        containsRegExp = isPatternKey(key);
+        propertyInfoMap.put(key, entry.getValue());
       }
     }
-    return propertyInfoMap;
+
+    if (regExpKey != null) {
+      if (!regExpKey.endsWith("/")){
+        regExpKey += "/";
+      }
+
+      for (Map.Entry<String, PropertyInfo> entry : componentMetricMap.entrySet()) {
+        if (entry.getKey().startsWith(regExpKey)) {
+          containsRegExp = true;
+          propertyInfoMap.put(entry.getKey(), entry.getValue());
+        }
+      }
+    }
+
+    return containsRegExp;
   }
+
+
 }

Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BaseProvider.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BaseProvider.java?rev=1493291&r1=1493290&r2=1493291&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BaseProvider.java (original)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BaseProvider.java Sat Jun 15 00:43:32 2013
@@ -27,9 +27,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 /**
  * Base provider implementation for both property and resource providers.
@@ -51,6 +54,9 @@ public abstract class BaseProvider {
    */
   private final Set<String> combinedIds;
 
+
+  private final Map<String, Pattern> patterns;
+
   /**
    * The logger.
    */
@@ -70,6 +76,13 @@ public abstract class BaseProvider {
     this.categoryIds = PropertyHelper.getCategories(propertyIds);
     this.combinedIds = new HashSet<String>(propertyIds);
     this.combinedIds.addAll(this.categoryIds);
+    this.patterns = new HashMap<String, Pattern>();
+    for (String id : this.combinedIds) {
+      if (id.matches(".*\\$\\d+.*")) {
+        String pattern = id.replaceAll("\\$\\d+", "\\\\S+");
+        patterns.put(id, Pattern.compile(pattern));
+      }
+    }
   }
 
 
@@ -106,12 +119,8 @@ public abstract class BaseProvider {
       // we want to treat property as a category and the entries as individual properties.
       Set<String> categoryProperties = new HashSet<String>();
       for (String unsupportedPropertyId : unsupportedPropertyIds) {
-        String category = PropertyHelper.getPropertyCategory(unsupportedPropertyId);
-        while (category != null) {
-          if (this.propertyIds.contains(category)) {
-            categoryProperties.add(unsupportedPropertyId);
-          }
-          category = PropertyHelper.getPropertyCategory(category);
+        if (checkCategory(unsupportedPropertyId) || checkRegExp(unsupportedPropertyId)) {
+          categoryProperties.add(unsupportedPropertyId);
         }
       }
       unsupportedPropertyIds.removeAll(categoryProperties);
@@ -148,16 +157,9 @@ public abstract class BaseProvider {
       Set<String> unsupportedPropertyIds = new HashSet<String>(propertyIds);
       unsupportedPropertyIds.removeAll(this.combinedIds);
 
-      // Add the categories to account for map properties where the entries will not be
-      // in the provider property list ids but the map (category) might be.
       for (String unsupportedPropertyId : unsupportedPropertyIds) {
-        String category = PropertyHelper.getPropertyCategory(unsupportedPropertyId);
-        while (category != null) {
-          if (this.propertyIds.contains(category)) {
-            keepers.add(unsupportedPropertyId);
-            break;
-          }
-          category = PropertyHelper.getPropertyCategory(category);
+        if (checkCategory(unsupportedPropertyId) || checkRegExp(unsupportedPropertyId)) {
+          keepers.add(unsupportedPropertyId);
         }
       }
       propertyIds.retainAll(this.combinedIds);
@@ -166,6 +168,50 @@ public abstract class BaseProvider {
     return propertyIds;
   }
 
+
+  /**
+   * Check the categories to account for map properties where the entries will not be
+   * in the provider property list ids but the map (category) might be.
+   */
+  private boolean checkCategory(String unsupportedPropertyId) {
+    String category = PropertyHelper.getPropertyCategory(unsupportedPropertyId);
+    while (category != null) {
+      if( this.propertyIds.contains(category)) {
+        return true;
+      }
+      category = PropertyHelper.getPropertyCategory(category);
+    }
+    return false;
+  }
+
+  private boolean checkRegExp(String unsupportedPropertyId) {
+    for (Pattern pattern : patterns.values()) {
+      Matcher matcher = pattern.matcher(unsupportedPropertyId);
+      if (matcher.matches()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  protected String getRegExpKey(String id) {
+    for (Map.Entry<String, Pattern> entry : patterns.entrySet()) {
+      Pattern pattern = entry.getValue();
+
+      Matcher matcher = pattern.matcher(id);
+
+      if (matcher.matches()) {
+        return entry.getKey();
+      }
+    }
+    return null;
+  }
+
+  protected boolean isPatternKey(String id) {
+    return patterns.containsKey(id);
+  }
+
+
   /**
    * Set a property value on the given resource for the given id and value.
    * Make sure that the id is in the given set of requested ids.

Modified: incubator/ambari/trunk/ambari-server/src/main/resources/ganglia_properties.json
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/resources/ganglia_properties.json?rev=1493291&r1=1493290&r2=1493291&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/resources/ganglia_properties.json (original)
+++ incubator/ambari/trunk/ambari-server/src/main/resources/ganglia_properties.json Sat Jun 15 00:43:32 2013
@@ -11811,8 +11811,294 @@
         "pointInTime":true,
         "temporal":true
       }
+    },
+    "FLUME_SERVER":{
+      "metrics/boottime":{
+        "metric":"boottime",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/cpu/cpu_aidle":{
+        "metric":"cpu_aidle",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/cpu/cpu_idle":{
+        "metric":"cpu_idle",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/cpu/cpu_nice":{
+        "metric":"cpu_nice",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/cpu/cpu_num":{
+        "metric":"cpu_num",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/cpu/cpu_speed":{
+        "metric":"cpu_speed",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/cpu/cpu_system":{
+        "metric":"cpu_system",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/cpu/cpu_user":{
+        "metric":"cpu_user",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/cpu/cpu_wio":{
+        "metric":"cpu_wio",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/disk/disk_free":{
+        "metric":"disk_free",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/disk/disk_total":{
+        "metric":"disk_total",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/disk/part_max_used":{
+        "metric":"part_max_used",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/jvm/gcCount":{
+        "metric":"jvm.metrics.gcCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/jvm/gcTimeMillis":{
+        "metric":"jvm.metrics.gcTimeMillis",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/jvm/logError":{
+        "metric":"jvm.metrics.logError",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/jvm/logFatal":{
+        "metric":"jvm.metrics.logFatal",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/jvm/logInfo":{
+        "metric":"jvm.metrics.logInfo",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/jvm/logWarn":{
+        "metric":"jvm.metrics.logWarn",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/jvm/maxMemoryM":{
+        "metric":"jvm.metrics.maxMemoryM",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/jvm/memHeapCommittedM":{
+        "metric":"jvm.metrics.memHeapCommittedM",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/jvm/memHeapUsedM":{
+        "metric":"jvm.metrics.memHeapUsedM",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/jvm/memNonHeapCommittedM":{
+        "metric":"jvm.metrics.memNonHeapCommittedM",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/jvm/memNonHeapUsedM":{
+        "metric":"jvm.metrics.memNonHeapUsedM",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/jvm/threadsBlocked":{
+        "metric":"jvm.metrics.threadsBlocked",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/jvm/threadsNew":{
+        "metric":"jvm.metrics.threadsNew",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/jvm/threadsRunnable":{
+        "metric":"jvm.metrics.threadsRunnable",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/jvm/threadsTerminated":{
+        "metric":"jvm.metrics.threadsTerminated",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/jvm/threadsTimedWaiting":{
+        "metric":"jvm.metrics.threadsTimedWaiting",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/jvm/threadsWaiting":{
+        "metric":"jvm.metrics.threadsWaiting",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/CHANNEL/$2/ChannelCapacity":{
+        "metric":"(\\w+).CHANNEL.(\\w+).ChannelCapacity",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/CHANNEL/$2/StartTime":{
+        "metric":"(\\w+).CHANNEL.(\\w+).StartTime",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/CHANNEL/$2/EventTakeAttemptCount":{
+        "metric":"(\\w+).CHANNEL.(\\w+).EventTakeAttemptCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/CHANNEL/$2/EventTakeSuccessCount":{
+        "metric":"(\\w+).CHANNEL.(\\w+).EventTakeSuccessCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/CHANNEL/$2/EventPutAttemptCount":{
+        "metric":"(\\w+).CHANNEL.(\\w+).EventPutAttemptCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/CHANNEL/$2/StopTime":{
+        "metric":"(\\w+).CHANNEL.(\\w+).StopTime",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/CHANNEL/$2/ChannelFillPercentage":{
+        "metric":"(\\w+).CHANNEL.(\\w+).ChannelFillPercentage",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/CHANNEL/$2/ChannelSize":{
+        "metric":"(\\w+).CHANNEL.(\\w+).ChannelSize",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/CHANNEL/$2/EventPutSuccessCount":{
+        "metric":"(\\w+).CHANNEL.(\\w+).EventPutSuccessCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SINK/$2/ConnectionCreatedCount":{
+        "metric":"(\\w+).SINK.(\\w+).ConnectionCreatedCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SINK/$2/BatchCompleteCount":{
+        "metric":"(\\w+).SINK.(\\w+).BatchCompleteCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SINK/$2/EventDrainSuccessCount":{
+        "metric":"(\\w+).SINK.(\\w+).EventDrainSuccessCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SINK/$2/StartTime":{
+        "metric":"(\\w+).SINK.(\\w+).StartTime",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SINK/$2/EventDrainAttemptCount":{
+        "metric":"(\\w+).SINK.(\\w+).EventDrainAttemptCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SINK/$2/ConnectionFailedCount":{
+        "metric":"(\\w+).SINK.(\\w+).ConnectionFailedCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SINK/$2/BatchUnderflowCount":{
+        "metric":"(\\w+).SINK.(\\w+).BatchUnderflowCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SINK/$2/ConnectionClosedCount":{
+        "metric":"(\\w+).SINK.(\\w+).ConnectionClosedCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SINK/$2/StopTime":{
+        "metric":"(\\w+).SINK.(\\w+).StopTime",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SINK/$2/BatchEmptyCount":{
+        "metric":"(\\w+).SINK.(\\w+).BatchEmptyCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SOURCE/$2/AppendBatchReceivedCount":{
+        "metric":"(\\w+).SOURCE.(\\w+).AppendBatchReceivedCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SOURCE/$2/AppendAcceptedCount":{
+        "metric":"(\\w+).SOURCE.(\\w+).AppendAcceptedCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SOURCE/$2/StartTime":{
+        "metric":"(\\w+).SOURCE.(\\w+).StartTime",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SOURCE/$2/OpenConnectionCount":{
+        "metric":"(\\w+).SOURCE.(\\w+).OpenConnectionCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SOURCE/$2/AppendBatchAcceptedCount":{
+        "metric":"(\\w+).SOURCE.(\\w+).AppendBatchAcceptedCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SOURCE/$2/AppendReceivedCount":{
+        "metric":"(\\w+).SOURCE.(\\w+).AppendReceivedCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SOURCE/$2/EventReceivedCount":{
+        "metric":"(\\w+).SOURCE.(\\w+).EventReceivedCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SOURCE/$2/StopTime":{
+        "metric":"(\\w+).SOURCE.(\\w+).StopTime",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SOURCE/$2/EventAcceptedCount":{
+        "metric":"(\\w+).SOURCE.(\\w+).EventAcceptedCount",
+        "pointInTime":true,
+        "temporal":true
+      }
     }
-
   },
 
   "HostComponent":{
@@ -20618,6 +20904,293 @@
         "pointInTime":true,
         "temporal":true
       }
+    },
+    "FLUME_SERVER":{
+      "metrics/boottime":{
+        "metric":"boottime",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/cpu/cpu_aidle":{
+        "metric":"cpu_aidle",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/cpu/cpu_idle":{
+        "metric":"cpu_idle",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/cpu/cpu_nice":{
+        "metric":"cpu_nice",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/cpu/cpu_num":{
+        "metric":"cpu_num",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/cpu/cpu_speed":{
+        "metric":"cpu_speed",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/cpu/cpu_system":{
+        "metric":"cpu_system",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/cpu/cpu_user":{
+        "metric":"cpu_user",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/cpu/cpu_wio":{
+        "metric":"cpu_wio",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/disk/disk_free":{
+        "metric":"disk_free",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/disk/disk_total":{
+        "metric":"disk_total",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/disk/part_max_used":{
+        "metric":"part_max_used",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/jvm/gcCount":{
+        "metric":"jvm.metrics.gcCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/jvm/gcTimeMillis":{
+        "metric":"jvm.metrics.gcTimeMillis",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/jvm/logError":{
+        "metric":"jvm.metrics.logError",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/jvm/logFatal":{
+        "metric":"jvm.metrics.logFatal",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/jvm/logInfo":{
+        "metric":"jvm.metrics.logInfo",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/jvm/logWarn":{
+        "metric":"jvm.metrics.logWarn",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/jvm/maxMemoryM":{
+        "metric":"jvm.metrics.maxMemoryM",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/jvm/memHeapCommittedM":{
+        "metric":"jvm.metrics.memHeapCommittedM",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/jvm/memHeapUsedM":{
+        "metric":"jvm.metrics.memHeapUsedM",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/jvm/memNonHeapCommittedM":{
+        "metric":"jvm.metrics.memNonHeapCommittedM",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/jvm/memNonHeapUsedM":{
+        "metric":"jvm.metrics.memNonHeapUsedM",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/jvm/threadsBlocked":{
+        "metric":"jvm.metrics.threadsBlocked",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/jvm/threadsNew":{
+        "metric":"jvm.metrics.threadsNew",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/jvm/threadsRunnable":{
+        "metric":"jvm.metrics.threadsRunnable",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/jvm/threadsTerminated":{
+        "metric":"jvm.metrics.threadsTerminated",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/jvm/threadsTimedWaiting":{
+        "metric":"jvm.metrics.threadsTimedWaiting",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/jvm/threadsWaiting":{
+        "metric":"jvm.metrics.threadsWaiting",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/CHANNEL/$2/ChannelCapacity":{
+        "metric":"(\\w+).CHANNEL.(\\w+).ChannelCapacity",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/CHANNEL/$2/StartTime":{
+        "metric":"(\\w+).CHANNEL.(\\w+).StartTime",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/CHANNEL/$2/EventTakeAttemptCount":{
+        "metric":"(\\w+).CHANNEL.(\\w+).EventTakeAttemptCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/CHANNEL/$2/EventTakeSuccessCount":{
+        "metric":"(\\w+).CHANNEL.(\\w+).EventTakeSuccessCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/CHANNEL/$2/EventPutAttemptCount":{
+        "metric":"(\\w+).CHANNEL.(\\w+).EventPutAttemptCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/CHANNEL/$2/StopTime":{
+        "metric":"(\\w+).CHANNEL.(\\w+).StopTime",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/CHANNEL/$2/ChannelFillPercentage":{
+        "metric":"(\\w+).CHANNEL.(\\w+).ChannelFillPercentage",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/CHANNEL/$2/ChannelSize":{
+        "metric":"(\\w+).CHANNEL.(\\w+).ChannelSize",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/CHANNEL/$2/EventPutSuccessCount":{
+        "metric":"(\\w+).CHANNEL.(\\w+).EventPutSuccessCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SINK/$2/ConnectionCreatedCount":{
+        "metric":"(\\w+).SINK.(\\w+).ConnectionCreatedCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SINK/$2/BatchCompleteCount":{
+        "metric":"(\\w+).SINK.(\\w+).BatchCompleteCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SINK/$2/EventDrainSuccessCount":{
+        "metric":"(\\w+).SINK.(\\w+).EventDrainSuccessCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SINK/$2/StartTime":{
+        "metric":"(\\w+).SINK.(\\w+).StartTime",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SINK/$2/EventDrainAttemptCount":{
+        "metric":"(\\w+).SINK.(\\w+).EventDrainAttemptCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SINK/$2/ConnectionFailedCount":{
+        "metric":"(\\w+).SINK.(\\w+).ConnectionFailedCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SINK/$2/BatchUnderflowCount":{
+        "metric":"(\\w+).SINK.(\\w+).BatchUnderflowCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SINK/$2/ConnectionClosedCount":{
+        "metric":"(\\w+).SINK.(\\w+).ConnectionClosedCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SINK/$2/StopTime":{
+        "metric":"(\\w+).SINK.(\\w+).StopTime",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SINK/$2/BatchEmptyCount":{
+        "metric":"(\\w+).SINK.(\\w+).BatchEmptyCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SOURCE/$2/AppendBatchReceivedCount":{
+        "metric":"(\\w+).SOURCE.(\\w+).AppendBatchReceivedCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SOURCE/$2/AppendAcceptedCount":{
+        "metric":"(\\w+).SOURCE.(\\w+).AppendAcceptedCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SOURCE/$2/StartTime":{
+        "metric":"(\\w+).SOURCE.(\\w+).StartTime",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SOURCE/$2/OpenConnectionCount":{
+        "metric":"(\\w+).SOURCE.(\\w+).OpenConnectionCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SOURCE/$2/AppendBatchAcceptedCount":{
+        "metric":"(\\w+).SOURCE.(\\w+).AppendBatchAcceptedCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SOURCE/$2/AppendReceivedCount":{
+        "metric":"(\\w+).SOURCE.(\\w+).AppendReceivedCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SOURCE/$2/EventReceivedCount":{
+        "metric":"(\\w+).SOURCE.(\\w+).EventReceivedCount",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SOURCE/$2/StopTime":{
+        "metric":"(\\w+).SOURCE.(\\w+).StopTime",
+        "pointInTime":true,
+        "temporal":true
+      },
+      "metrics/flume/$1/SOURCE/$2/EventAcceptedCount":{
+        "metric":"(\\w+).SOURCE.(\\w+).EventAcceptedCount",
+        "pointInTime":true,
+        "temporal":true
+      }
     }
   }
 }
\ No newline at end of file

Modified: incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/controller/ganglia/GangliaPropertyProviderTest.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/controller/ganglia/GangliaPropertyProviderTest.java?rev=1493291&r1=1493290&r2=1493291&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/controller/ganglia/GangliaPropertyProviderTest.java (original)
+++ incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/controller/ganglia/GangliaPropertyProviderTest.java Sat Jun 15 00:43:32 2013
@@ -38,6 +38,11 @@ import java.util.Set;
 public class GangliaPropertyProviderTest {
 
   private static final String PROPERTY_ID = PropertyHelper.getPropertyId("metrics/jvm", "gcCount");
+  private static final String FLUME_CHANNEL_CAPACITY_PROPERTY = "metrics/flume/flume/CHANNEL/c1/ChannelCapacity";
+  private static final String FLUME_CATEGORY = "metrics/flume";
+  private static final String FLUME_CATEGORY2 = "metrics/flume/flume";
+  private static final String FLUME_CATEGORY3 = "metrics/flume/flume/CHANNEL";
+  private static final String FLUME_CATEGORY4 = "metrics/flume/flume/CHANNEL/c1";
   private static final String CLUSTER_NAME_PROPERTY_ID = PropertyHelper.getPropertyId("HostRoles", "cluster_name");
   private static final String HOST_NAME_PROPERTY_ID = PropertyHelper.getPropertyId("HostRoles", "host_name");
   private static final String COMPONENT_NAME_PROPERTY_ID = PropertyHelper.getPropertyId("HostRoles", "component_name");
@@ -191,6 +196,204 @@ public class GangliaPropertyProviderTest
 
   }
 
+
+  @Test
+  public void testPopulateResources_params() throws Exception {
+    TestStreamProvider streamProvider  = new TestStreamProvider("flume_ganglia_data.txt");
+    TestGangliaHostProvider hostProvider = new TestGangliaHostProvider();
+
+    GangliaPropertyProvider propertyProvider = new GangliaHostComponentPropertyProvider(
+        PropertyHelper.getGangliaPropertyIds(Resource.Type.HostComponent),
+        streamProvider,
+        hostProvider,
+        CLUSTER_NAME_PROPERTY_ID,
+        HOST_NAME_PROPERTY_ID,
+        COMPONENT_NAME_PROPERTY_ID);
+
+    // flume
+    Resource resource = new ResourceImpl(Resource.Type.HostComponent);
+
+    resource.setProperty(HOST_NAME_PROPERTY_ID, "ip-10-39-113-33.ec2.internal");
+    resource.setProperty(COMPONENT_NAME_PROPERTY_ID, "FLUME_SERVER");
+
+    // only ask for one property
+    Map<String, TemporalInfo> temporalInfoMap = new HashMap<String, TemporalInfo>();
+    temporalInfoMap.put(FLUME_CHANNEL_CAPACITY_PROPERTY, new TemporalInfoImpl(10L, 20L, 1L));
+    Request  request = PropertyHelper.getReadRequest(Collections.singleton(FLUME_CHANNEL_CAPACITY_PROPERTY), temporalInfoMap);
+
+    Assert.assertEquals(1, propertyProvider.populateResources(Collections.singleton(resource), request, null).size());
+
+    Assert.assertEquals("http://domU-12-31-39-0E-34-E1.compute-1.internal/cgi-bin/rrd.py?c=HDPSlaves&h=ip-10-39-113-33.ec2.internal&s=10&e=20&r=1",
+        streamProvider.getLastSpec());
+
+    Assert.assertEquals(3, PropertyHelper.getProperties(resource).size());
+    Assert.assertNotNull(resource.getPropertyValue(FLUME_CHANNEL_CAPACITY_PROPERTY));
+  }
+
+
+  @Test
+  public void testPopulateResources_paramsAll() throws Exception {
+    TestStreamProvider streamProvider  = new TestStreamProvider("flume_ganglia_data.txt");
+    TestGangliaHostProvider hostProvider = new TestGangliaHostProvider();
+
+    GangliaPropertyProvider propertyProvider = new GangliaHostComponentPropertyProvider(
+        PropertyHelper.getGangliaPropertyIds(Resource.Type.HostComponent),
+        streamProvider,
+        hostProvider,
+        CLUSTER_NAME_PROPERTY_ID,
+        HOST_NAME_PROPERTY_ID,
+        COMPONENT_NAME_PROPERTY_ID);
+
+    // flume
+    Resource resource = new ResourceImpl(Resource.Type.HostComponent);
+
+    resource.setProperty(HOST_NAME_PROPERTY_ID, "ip-10-39-113-33.ec2.internal");
+    resource.setProperty(COMPONENT_NAME_PROPERTY_ID, "FLUME_SERVER");
+
+    Map<String, TemporalInfo> temporalInfoMap = new HashMap<String, TemporalInfo>();
+    Request  request = PropertyHelper.getReadRequest(Collections.<String>emptySet(), temporalInfoMap);
+
+    Assert.assertEquals(1, propertyProvider.populateResources(Collections.singleton(resource), request, null).size());
+
+    Assert.assertEquals("http://domU-12-31-39-0E-34-E1.compute-1.internal/cgi-bin/rrd.py?c=HDPSlaves&h=ip-10-39-113-33.ec2.internal&e=now&pt=true",
+        streamProvider.getLastSpec());
+
+    Assert.assertEquals(33, PropertyHelper.getProperties(resource).size());
+    Assert.assertNotNull(resource.getPropertyValue(FLUME_CHANNEL_CAPACITY_PROPERTY));
+  }
+
+  @Test
+  public void testPopulateResources_params_category1() throws Exception {
+    TestStreamProvider streamProvider  = new TestStreamProvider("flume_ganglia_data.txt");
+    TestGangliaHostProvider hostProvider = new TestGangliaHostProvider();
+
+    GangliaPropertyProvider propertyProvider = new GangliaHostComponentPropertyProvider(
+        PropertyHelper.getGangliaPropertyIds(Resource.Type.HostComponent),
+        streamProvider,
+        hostProvider,
+        CLUSTER_NAME_PROPERTY_ID,
+        HOST_NAME_PROPERTY_ID,
+        COMPONENT_NAME_PROPERTY_ID);
+
+    // flume
+    Resource resource = new ResourceImpl(Resource.Type.HostComponent);
+
+    resource.setProperty(HOST_NAME_PROPERTY_ID, "ip-10-39-113-33.ec2.internal");
+    resource.setProperty(COMPONENT_NAME_PROPERTY_ID, "FLUME_SERVER");
+
+    // only ask for one property
+    Map<String, TemporalInfo> temporalInfoMap = new HashMap<String, TemporalInfo>();
+    temporalInfoMap.put(FLUME_CATEGORY, new TemporalInfoImpl(10L, 20L, 1L));
+    Request  request = PropertyHelper.getReadRequest(Collections.singleton(FLUME_CATEGORY), temporalInfoMap);
+
+    Assert.assertEquals(1, propertyProvider.populateResources(Collections.singleton(resource), request, null).size());
+
+    Assert.assertEquals("http://domU-12-31-39-0E-34-E1.compute-1.internal/cgi-bin/rrd.py?c=HDPSlaves&h=ip-10-39-113-33.ec2.internal&s=10&e=20&r=1",
+        streamProvider.getLastSpec());
+
+    Assert.assertEquals(21, PropertyHelper.getProperties(resource).size());
+    Assert.assertNotNull(resource.getPropertyValue(FLUME_CHANNEL_CAPACITY_PROPERTY));
+  }
+
+  @Test
+  public void testPopulateResources_params_category2() throws Exception {
+    TestStreamProvider streamProvider  = new TestStreamProvider("flume_ganglia_data.txt");
+    TestGangliaHostProvider hostProvider = new TestGangliaHostProvider();
+
+    GangliaPropertyProvider propertyProvider = new GangliaHostComponentPropertyProvider(
+        PropertyHelper.getGangliaPropertyIds(Resource.Type.HostComponent),
+        streamProvider,
+        hostProvider,
+        CLUSTER_NAME_PROPERTY_ID,
+        HOST_NAME_PROPERTY_ID,
+        COMPONENT_NAME_PROPERTY_ID);
+
+    // flume
+    Resource resource = new ResourceImpl(Resource.Type.HostComponent);
+
+    resource.setProperty(HOST_NAME_PROPERTY_ID, "ip-10-39-113-33.ec2.internal");
+    resource.setProperty(COMPONENT_NAME_PROPERTY_ID, "FLUME_SERVER");
+
+    // only ask for one property
+    Map<String, TemporalInfo> temporalInfoMap = new HashMap<String, TemporalInfo>();
+    temporalInfoMap.put(FLUME_CATEGORY2, new TemporalInfoImpl(10L, 20L, 1L));
+    Request  request = PropertyHelper.getReadRequest(Collections.singleton(FLUME_CATEGORY2), temporalInfoMap);
+
+    Assert.assertEquals(1, propertyProvider.populateResources(Collections.singleton(resource), request, null).size());
+
+    Assert.assertEquals("http://domU-12-31-39-0E-34-E1.compute-1.internal/cgi-bin/rrd.py?c=HDPSlaves&h=ip-10-39-113-33.ec2.internal&s=10&e=20&r=1",
+        streamProvider.getLastSpec());
+
+    Assert.assertEquals(21, PropertyHelper.getProperties(resource).size());
+    Assert.assertNotNull(resource.getPropertyValue(FLUME_CHANNEL_CAPACITY_PROPERTY));
+  }
+
+  @Test
+  public void testPopulateResources_params_category3() throws Exception {
+    TestStreamProvider streamProvider  = new TestStreamProvider("flume_ganglia_data.txt");
+    TestGangliaHostProvider hostProvider = new TestGangliaHostProvider();
+
+    GangliaPropertyProvider propertyProvider = new GangliaHostComponentPropertyProvider(
+        PropertyHelper.getGangliaPropertyIds(Resource.Type.HostComponent),
+        streamProvider,
+        hostProvider,
+        CLUSTER_NAME_PROPERTY_ID,
+        HOST_NAME_PROPERTY_ID,
+        COMPONENT_NAME_PROPERTY_ID);
+
+    // flume
+    Resource resource = new ResourceImpl(Resource.Type.HostComponent);
+
+    resource.setProperty(HOST_NAME_PROPERTY_ID, "ip-10-39-113-33.ec2.internal");
+    resource.setProperty(COMPONENT_NAME_PROPERTY_ID, "FLUME_SERVER");
+
+    // only ask for one property
+    Map<String, TemporalInfo> temporalInfoMap = new HashMap<String, TemporalInfo>();
+    temporalInfoMap.put(FLUME_CATEGORY3, new TemporalInfoImpl(10L, 20L, 1L));
+    Request  request = PropertyHelper.getReadRequest(Collections.singleton(FLUME_CATEGORY3), temporalInfoMap);
+
+    Assert.assertEquals(1, propertyProvider.populateResources(Collections.singleton(resource), request, null).size());
+
+    Assert.assertEquals("http://domU-12-31-39-0E-34-E1.compute-1.internal/cgi-bin/rrd.py?c=HDPSlaves&h=ip-10-39-113-33.ec2.internal&s=10&e=20&r=1",
+        streamProvider.getLastSpec());
+
+    Assert.assertEquals(11, PropertyHelper.getProperties(resource).size());
+    Assert.assertNotNull(resource.getPropertyValue(FLUME_CHANNEL_CAPACITY_PROPERTY));
+  }
+
+  @Test
+  public void testPopulateResources_params_category4() throws Exception {
+    TestStreamProvider streamProvider  = new TestStreamProvider("flume_ganglia_data.txt");
+    TestGangliaHostProvider hostProvider = new TestGangliaHostProvider();
+
+    GangliaPropertyProvider propertyProvider = new GangliaHostComponentPropertyProvider(
+        PropertyHelper.getGangliaPropertyIds(Resource.Type.HostComponent),
+        streamProvider,
+        hostProvider,
+        CLUSTER_NAME_PROPERTY_ID,
+        HOST_NAME_PROPERTY_ID,
+        COMPONENT_NAME_PROPERTY_ID);
+
+    // flume
+    Resource resource = new ResourceImpl(Resource.Type.HostComponent);
+
+    resource.setProperty(HOST_NAME_PROPERTY_ID, "ip-10-39-113-33.ec2.internal");
+    resource.setProperty(COMPONENT_NAME_PROPERTY_ID, "FLUME_SERVER");
+
+    // only ask for one property
+    Map<String, TemporalInfo> temporalInfoMap = new HashMap<String, TemporalInfo>();
+    temporalInfoMap.put(FLUME_CATEGORY4, new TemporalInfoImpl(10L, 20L, 1L));
+    Request  request = PropertyHelper.getReadRequest(Collections.singleton(FLUME_CATEGORY4), temporalInfoMap);
+
+    Assert.assertEquals(1, propertyProvider.populateResources(Collections.singleton(resource), request, null).size());
+
+    Assert.assertEquals("http://domU-12-31-39-0E-34-E1.compute-1.internal/cgi-bin/rrd.py?c=HDPSlaves&h=ip-10-39-113-33.ec2.internal&s=10&e=20&r=1",
+        streamProvider.getLastSpec());
+
+    Assert.assertEquals(11, PropertyHelper.getProperties(resource).size());
+    Assert.assertNotNull(resource.getPropertyValue(FLUME_CHANNEL_CAPACITY_PROPERTY));
+  }
+
   private static class TestGangliaHostProvider implements GangliaHostProvider {
 
     @Override