You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2018/08/08 04:30:27 UTC

[GitHub] doc987 commented on issue #6004: Automatically Determine Ingestion Schema

doc987 commented on issue #6004: Automatically Determine Ingestion Schema
URL: https://github.com/apache/incubator-druid/issues/6004#issuecomment-411281495
 
 
   That gets part of the way there, though the data provided by Collectd and Telegraf are formatted somewhat differently.  That means that either the an output transform would be needed for those programs, or an input transform for Druid.
   
   The documentation page for the [Collectd JSON format](https://collectd.org/wiki/index.php/JSON) is currently down.  The Telegraf JSON format documentation is [here](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md).
   
   Some sample code is shown below to convert JSON from Telegraf to JSON for Druid.  Due to the flattening of the JSON, there are some tag names (metric, name, timestamp, and value) that could result in name collisions, and are therefore dropped (if they exist) in the code below.  Another possibility would be to try and determine an alternative name.
   
   Any opinions on a good approach to take?  For example:
   1. Should Druid be able to read the Telegraf JSON format directly?  Since it's a rather generic format, it could also be used by other applications.
   2. Should the Telegraf format be flattened to the JSON format that Druid already reads?  If so, should the code be part of Druid, or separate?
   
   The Druid [documentation](http://druid.io/docs/0.12.1/development/modules.html) mentions that ```io.druid.data.input.impl.InputRowParser``` should be extended for an input parser.
   
   ```java
   import java.util.*;
   
   import com.fasterxml.jackson.core.JsonProcessingException;
   import com.fasterxml.jackson.databind.ObjectMapper;
   import com.fasterxml.jackson.databind.JsonNode;
   import com.fasterxml.jackson.databind.node.ObjectNode;
   
   //given String json from Telegraf, it might be converted to JSON input for Druid as follows
   Telegraf_Flatten tf = new Telegraf_Flatten();
   JsonNode node = tf.json_string_to_node(json);
   json = tf.flatten(node);
   
   class Telegraf_Flatten {
      
      public JsonNode json_string_to_node(String json){
         ObjectMapper mapper = new ObjectMapper();
         JsonNode node = null;
         try{
            node = mapper.readTree(json);
         }catch(java.io.IOException e){
            e.printStackTrace();
         }
         return node;
      }
      
      public String flatten(JsonNode node){
         String json = null;
         JsonNode metrics = node.path("metrics");
         if(metrics.isMissingNode()) json = flatten_single(node);
         else json = this.flatten_batch(metrics);
         return json;
      }
      
      public String flatten_batch(JsonNode metrics){
         String json = "";
         for(JsonNode item : metrics){
            json += this.flatten_single(item);
         }
         return json;
      }
      
      public String flatten_single(JsonNode root){
         
         //extract nested json
         JsonNode fields = root.get("fields");
         JsonNode tags   = root.path("tags");
         
         ((ObjectNode) root).remove("fields");
         
         //flatten tags
         if(!tags.isMissingNode()){
            ((ObjectNode) root).remove("tags");
            Iterator<Map.Entry<String,JsonNode>> iter = tags.fields();
            while(iter.hasNext()){
               Map.Entry<String,JsonNode> entry = iter.next();
               if(root.path(entry.getKey()).isMissingNode()){
                  ((ObjectNode) root).set(entry.getKey(),entry.getValue());
               }
            }
         }
         
         //split into one JsonNode object per field
         Iterator<Map.Entry<String,JsonNode>> iter = fields.fields();
         int metrics = fields.size();
         JsonNode[] row = new JsonNode[metrics];
         for(int i=0;iter.hasNext();i++){
            Map.Entry<String,JsonNode> entry = iter.next();
            row[i] = root.deepCopy();
            ((ObjectNode) row[i]).put("metric",entry.getKey());
            ((ObjectNode) row[i]).set("value",entry.getValue());
         }
         
         //convert JsonNode(s) to JSON String
         ObjectMapper mapper = new ObjectMapper();
         String json = "";
         for(int i=0;i<metrics;i++){
            try{
               json += mapper.writeValueAsString(row[i]) + "\n";
            }catch(JsonProcessingException e){
               e.printStackTrace();
            }
         }
         
         return json;
      }
      
   }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org