You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by khadar basha <kh...@gmail.com> on 2012/07/27 14:49:23 UTC

Log4JAppender Ignoring the Logging Pattern. Getting only description.

Hi

I am using Flume1.2. Using avro source and hdfs sink. Sending message using
the Logger channel from application server. For that i am using the
*org.apache.flume.clients.log4jappender.Log4jAppender
*in MyApp.

But i am getting only body of the message (description). loosing the time,
thread, Level information.

flume-conf.properties file
==================
agent2Test1.sources = seqGenSrc
agent2Test1.channels = memoryChannel
agent2Test1.sinks = loggerSink

# For each one of the sources, the type is defined
agent2Test1.sources.seqGenSrc.type = avro
agent2Test1.sources.seqGenSrc.bind=localhost
agent2Test1.sources.seqGenSrc.port=41414

# interceptors for host and date
agent2Test1.sources.seqGenSrc.interceptors = time hostInterceptor
agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.type =
org.apache.flume.interceptor.HostInterceptor$Builder
agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader = host
agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.useIP = false
agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader.preserveExisting
= false
agent2Test1.sources.seqGenSrc.interceptors.time.type =
org.apache.flume.interceptor.TimestampInterceptor$Builder

# The channel can be defined as follows.
agent2Test1.sources.seqGenSrc.channels = memoryChannel

# Each sink's type must be defined
agent2Test1.sinks.loggerSink.type = hdfs
agent2Test1.sinks.loggerSink.hdfs.path =
hdfs://hadoopHost:8020/data/%Y/%m/%d/%{host}/Logs

agent2Test1.sinks.loggerSink.hdfs.fileType = DataStream

#Specify the channel the sink should use
agent2Test1.sinks.loggerSink.channel = memoryChannel

# Each channel's type is defined.
agent2Test1.channels.memoryChannel.type = memory

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent2Test1.channels.memoryChannel.capacity = 1000



Sample java program to generate the log message:
=====================================


package com.test;


import org.apache.flume.clients.log4jappender.Log4jAppender;
import org.apache.log4j.Logger;
import org.apache.log4j.MDC;
import org.apache.log4j.PatternLayout;

import java.util.UUID;


public class Main {
    static Logger log = Logger.getLogger(Main.class);

    public static void main(String[] args) {
        try {
         Log4jAppender appender = new Log4jAppender();
            appender.setHostname("localhost");
            appender.setPort(41414);
            appender.setLayout(new PatternLayout("%d [%c] (%t) <%X{user}
%X{field}> %m"));
         //   appender.setReconnectAttempts(100);

           appender.activateOptions();

            log.addAppender(appender);

            MDC.put("user", "chris");
          //  while (true) {
                MDC.put("field", UUID.randomUUID().toString());
                log.info("=====> Hello World");
                try {
                    throw new Exception("Testing");
                } catch (Exception e) {
                    log.error("Gone wrong ===>", e);
                }
            //}
                System.in.read();
                System.in.read();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}



I am missing any config here ?



-- 
Thanks,
Khadar

Re: Log4JAppender Ignoring the Logging Pattern. Getting only description.

Posted by Hari Shreedharan <hs...@cloudera.com>.
Thanks for the information. It would be great if you could submit this for review. We will review and commit it and get into Flume itself. 

Thanks,
Hari

-- 
Hari Shreedharan


On Thursday, August 9, 2012 at 10:46 PM, khadar basha wrote:

> Hi All,
> 
> Finally i am able to send the logs in log4j format. I have modified the Log4jAppender.java  to include the formatted message into FlumeEvent's body. 
> I verified through programaically.Its working fine. It is Working fine with log4j.xml.  by using log4j.properties it is not taking the format. Not sure anything need to be done.
> 
> Modified Log4jAppender.java
> =====================
> package org.apache.flume.clients.log4jappender;
> 
> import java.nio.charset.Charset;
> import java.util.HashMap;
> import java.util.Map;
> 
> import org.apache.flume.Event;
> import org.apache.flume.EventDeliveryException;
> import org.apache.flume.FlumeException;
> import org.apache.flume.api.RpcClient;
> import org.apache.flume.api.RpcClientFactory;
> import org.apache.flume.event.EventBuilder;
> 
> import org.apache.log4j.AppenderSkeleton;
> import org.apache.log4j.Layout;
> import org.apache.log4j.helpers.LogLog;
> import org.apache.log4j.spi.LoggingEvent;
> 
> /**
>  *
>  * Appends Log4j Events to an external Flume client which is decribed by
>  * the Log4j configuration file. The appender takes two required parameters:
>  *<p>
>  *<strong>Hostname</strong> : This is the hostname of the first hop
>  *at which Flume (through an AvroSource) is listening for events.
>  *</p>
>  *<p>
>  *<strong>Port</strong> : This the port on the above host where the Flume
>  *Source is listening for events.
>  *</p>
>  *A sample log4j properties file which appends to a source would look like:
>  *<pre><p>
>  *log4j.appender.out2 = org.apache.flume.clients.log4jappender.Log4jAppender
>  *log4j.appender.out2.Port = 25430
>  *log4j.appender.out2.Hostname = foobarflumesource.com (http://foobarflumesource.com)
>  *log4j.logger.org.apache.flume.clients.log4jappender = DEBUG,out2</p></pre>
>  *<p><i>Note: Change the last line to the package of the class(es), that will
>  *do the appending.For example if classes from the package
>  *com.bar.foo are appending, the last line would be:</i></p>
>  *<pre><p>log4j.logger.com.bar.foo = DEBUG,out2</p></pre>
>  *
>  *
>  */
> public class Log4jAppender extends AppenderSkeleton {
> 
>   private String hostname;
>   private int port;
>   private RpcClient rpcClient = null;
> 
> 
> 
>   /**
>    * If this constructor is used programmatically rather than from a log4j conf
>    * you must set the <tt>port</tt> and <tt>hostname</tt> and then call
>    * <tt>activateOptions()</tt> before calling <tt>append()</tt>.
>    */
>   public Log4jAppender(){
>  super();
>  System.out.println("Inside Constructor"+layout);
>   }
> 
>   /**
>    * Sets the hostname and port. Even if these are passed the
>    * <tt>activateOptions()</tt> function must be called before calling
>    * <tt>append()</tt>, else <tt>append()</tt> will throw an Exception.
>    * @param hostname The first hop where the client should connect to.
>    * @param port The port to connect on the host.
>    *
>    */
>   public Log4jAppender(String hostname, int port){
>     this.hostname = hostname;
>     this.port = port;
>     System.out.println("Inside Constructor from "+layout);
>   }
>   
>   
>   public Log4jAppender(String hostname, int port, Layout layout){
>    this.hostname = hostname;
>    this.port = port;
>    this.layout = layout;
>  }
> 
>   /**
>    * Append the LoggingEvent, to send to the first Flume hop.
>    * @param event The LoggingEvent to be appended to the flume.
>    * @throws FlumeException if the appender was closed,
>    * or the hostname and port were not setup, there was a timeout, or there
>    * was a connection error.
>    */
>   @Override
>   public synchronized void append(LoggingEvent event) throws FlumeException{
>     //If rpcClient is null, it means either this appender object was never
>     //setup by setting hostname and port and then calling activateOptions
>     //or this appender object was closed by calling close(), so we throw an
>     //exception to show the appender is no longer accessible.
>     if(rpcClient == null){
>       throw new FlumeException("Cannot Append to Appender!" +
>           "Appender either closed or not setup correctly!");
>     }
> 
>     if(!rpcClient.isActive()){
>       reconnect();
>     }
> 
>     //Client created first time append is called.
>     Map<String, String> hdrs = new HashMap<String, String>();
>     hdrs.put(Log4jAvroHeaders.LOGGER_NAME.toString(), event.getLoggerName());
>     hdrs.put(Log4jAvroHeaders.TIMESTAMP.toString(),
>         String.valueOf(event.getTimeStamp()));
> 
>     //To get the level back simply use
>     //LoggerEvent.toLevel(hdrs.get(Integer.parseInt(
>     //Log4jAvroHeaders.LOG_LEVEL.toString()))
>     hdrs.put(Log4jAvroHeaders.LOG_LEVEL.toString(),
>         String.valueOf(event.getLevel().toInt()));
>     hdrs.put(Log4jAvroHeaders.MESSAGE_ENCODING.toString(), "UTF8");
> 
>     StringBuilder body = null;
>     if(layout != null){
>     
>     body = new StringBuilder(layout.format(event));
>         
>         if(layout.ignoresThrowable()){    
>           String[] s = event.getThrowableStrRep();
>     
>           if (s != null) {
>     
>             int len = s.length;
>     
>             for (int i = 0; i < len; i++) {
>               body.append(s[i]);
>               body.append('\n');
>             }
>             body.setLength(body.length()-1);
>           }
>           
>         }
>         
>     }
>     
>     if(layout == null) System.out.println("====== Layout is NULL ======");
>     
>     Event flumeEvent = EventBuilder.withBody((layout == null) ? event.getMessage().toString(): body.toString(),
>         Charset.forName("UTF8"), hdrs);
> 
>     try {
>       rpcClient.append(flumeEvent);
>     } catch (EventDeliveryException e) {
>       String msg = "Flume append() failed.";
>       LogLog.error(msg);
>       throw new FlumeException(msg + " Exception follows.", e);
>     }
>   }
> 
>   //This function should be synchronized to make sure one thread
>   //does not close an appender another thread is using, and hence risking
>   //a null pointer exception.
>   /**
>    * Closes underlying client.
>    * If <tt>append()</tt> is called after this function is called,
>    * it will throw an exception.
>    * @throws FlumeException if errors occur during close
>    */
>   @Override
>   public synchronized void close() throws FlumeException{
>     //Any append calls after this will result in an Exception.
>     if (rpcClient != null) {
>       rpcClient.close();
>       rpcClient = null;
>     }
>   }
> 
>   @Override
>   public boolean requiresLayout() {
>     return false;
>   }
> 
>   /**
>    * Set the first flume hop hostname.
>    * @param hostname The first hop where the client should connect to.
>    */
>   public void setHostname(String hostname){
>     this.hostname = hostname;
>   }
> 
>   /**
>    * Set the port on the hostname to connect to.
>    * @param port The port to connect on the host.
>    */
>   public void setPort(int port){
>     this.port = port;
>   }
> 
>   /**
>    * Activate the options set using <tt>setPort()</tt>
>    * and <tt>setHostname()</tt>
>    * @throws FlumeException if the <tt>hostname</tt> and
>    *  <tt>port</tt> combination is invalid.
>    */
>   @Override
>   public void activateOptions() throws FlumeException{
>     try {
>       rpcClient = RpcClientFactory.getDefaultInstance(hostname, port);
>     } catch (FlumeException e) {
>       String errormsg = "RPC client creation failed! " +
>           e.getMessage();
>       LogLog.error(errormsg);
>       throw e;
>     }
>   }
> 
>   /**
>    * Make it easy to reconnect on failure
>    * @throws FlumeException
>    */
>   private void reconnect() throws FlumeException {
>     close();
>     activateOptions();
>   }
> 
> }
> 
> 
> Program to send the log messages: 
> ==========================
>  public void test(){
>     try {
>             Log4jAppender appender = new Log4jAppender();
>                appender.setHostname("flume_agent_host");
>                appender.setPort(41414);
>                appender.setLayout(new PatternLayout("%d [%c] (%t) <%X{user} %X{field}> %m"));
>             //   appender.setReconnectAttempts(100);
>                
>               appender.activateOptions();
> 
>                log.addAppender(appender);
> 
>                MDC.put("user", "chris"); 
>              //  while (true) {
>                    MDC.put("field", UUID.randomUUID().toString());
>                    log.info (http://log.info)("=====> Hello World");
>                    try {
>                        throw new Exception("Testing");
>                    } catch (Exception e) {
>                        log.error("Gone wrong ===>", e);
>                    }
>                //}
>                    System.in.read();
>                    System.in.read();
>            }
>            catch (Exception e) {
>                e.printStackTrace();
>            }
>        }
> 
> 
> 
> 
> Thanks,
> Khadar
> 
> 
> 
> On Fri, Jul 27, 2012 at 8:55 PM, Ralph Goers <ralph.goers@dslextreme.com (mailto:ralph.goers@dslextreme.com)> wrote:
> > Khadar,
> > 
> > I am not sure if your reply was meant as a response to my comment or just as additional information. The Log4j 2 FlumeAppender is not part of Flume. See http://logging.apache.org/log4j/2.x/manual/appenders.html#FlumeAvroAppender. However, Log4j 2 is still waiting for its first release so you will have to build it yourself if you want to try it. 
> > 
> > Ralph
> > 
> > On Jul 27, 2012, at 6:58 AM, khadar basha wrote:
> > > FlumeLog4jAvroAppender is available as part of 0.94. But its not available in 1.2.Log4jAppender is from flume-ng-log4jappender-1.2.0.jar
> > > 
> > > I think it is replace as part of 1.2.
> > > 
> > > On Fri, Jul 27, 2012 at 6:31 PM, Ralph Goers <rgoers@apache.org (mailto:rgoers@apache.org)> wrote:
> > > > You might consider looking at Log4j 2. It has a Flume Appender that records the whole formatted log message in the body. In addition, it will record the MDC fields as well.
> > > > 
> > > > Sent from my iPad 
> > > > 
> > > > On Jul 27, 2012, at 5:49 AM, khadar basha <khadarskb@gmail.com (mailto:khadarskb@gmail.com)> wrote:
> > > > 
> > > > > Hi 
> > > > > 
> > > > > I am using Flume1.2. Using avro source and hdfs sink. Sending message using the Logger channel from application server. For that i am using the org.apache.flume.clients.log4jappender.Log4jAppender in MyApp. 
> > > > > 
> > > > > But i am getting only body of the message (description). loosing the time, thread, Level information. 
> > > > > 
> > > > > flume-conf.properties file
> > > > > ==================
> > > > > agent2Test1.sources = seqGenSrc
> > > > > agent2Test1.channels = memoryChannel
> > > > > agent2Test1.sinks = loggerSink
> > > > > 
> > > > > # For each one of the sources, the type is defined
> > > > > agent2Test1.sources.seqGenSrc.type = avro
> > > > > agent2Test1.sources.seqGenSrc.bind=localhost
> > > > > agent2Test1.sources.seqGenSrc.port=41414
> > > > > 
> > > > > # interceptors for host and date
> > > > > agent2Test1.sources.seqGenSrc.interceptors = time hostInterceptor
> > > > > agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.type = org.apache.flume.interceptor.HostInterceptor$Builder
> > > > > agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader = host
> > > > > agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.useIP = false
> > > > > agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader.preserveExisting = false
> > > > > agent2Test1.sources.seqGenSrc.interceptors.time.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
> > > > > 
> > > > > # The channel can be defined as follows.
> > > > > agent2Test1.sources.seqGenSrc.channels = memoryChannel
> > > > > 
> > > > > # Each sink's type must be defined
> > > > > agent2Test1.sinks.loggerSink.type = hdfs
> > > > > agent2Test1.sinks.loggerSink.hdfs.path = hdfs://hadoopHost:8020/data/%Y/%m/%d/%{host}/Logs
> > > > > 
> > > > > agent2Test1.sinks.loggerSink.hdfs.fileType = DataStream
> > > > > 
> > > > > #Specify the channel the sink should use 
> > > > > agent2Test1.sinks.loggerSink.channel = memoryChannel
> > > > > 
> > > > > # Each channel's type is defined.
> > > > > agent2Test1.channels.memoryChannel.type = memory
> > > > > 
> > > > > # Other config values specific to each type of channel(sink or source) 
> > > > > # can be defined as well
> > > > > # In this case, it specifies the capacity of the memory channel
> > > > > agent2Test1.channels.memoryChannel.capacity = 1000
> > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > > > Sample java program to generate the log message:
> > > > > =====================================
> > > > > 
> > > > > 
> > > > > package com.test;
> > > > > 
> > > > > 
> > > > > import org.apache.flume.clients.log4jappender.Log4jAppender; 
> > > > > import org.apache.log4j.Logger;
> > > > > import org.apache.log4j.MDC;
> > > > > import org.apache.log4j.PatternLayout;
> > > > > 
> > > > > import java.util.UUID;
> > > > > 
> > > > > 
> > > > > public class Main { 
> > > > >     static Logger log = Logger.getLogger(Main.class);
> > > > > 
> > > > >     public static void main(String[] args) {
> > > > >         try {
> > > > >          Log4jAppender appender = new Log4jAppender();
> > > > >             appender.setHostname("localhost");
> > > > >             appender.setPort(41414);
> > > > >             appender.setLayout(new PatternLayout("%d [%c] (%t) <%X{user} %X{field}> %m"));
> > > > >          //   appender.setReconnectAttempts(100);
> > > > >             
> > > > >            appender.activateOptions();
> > > > > 
> > > > >             log.addAppender(appender);
> > > > > 
> > > > >             MDC.put("user", "chris"); 
> > > > >           //  while (true) {
> > > > >                 MDC.put("field", UUID.randomUUID().toString());
> > > > >                 log.info (http://log.info/)("=====> Hello World");
> > > > >                 try {
> > > > >                     throw new Exception("Testing");
> > > > >                 } catch (Exception e) {
> > > > >                     log.error("Gone wrong ===>", e);
> > > > >                 }
> > > > >             //}
> > > > >                 System.in.read();
> > > > >                 System.in.read();
> > > > >         }
> > > > >         catch (Exception e) {
> > > > >             e.printStackTrace();
> > > > >         }
> > > > >     }
> > > > > }
> > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > > > I am missing any config here ?
> > > > > 
> > > > > 
> > > > > 
> > > > > -- 
> > > > > Thanks,
> > > > > Khadar
> > > > > 
> > > 
> > > 
> > > 
> > > -- 
> > > Thanks,
> > > Khadar
> > > 
> > 
> 
> 
> 
> -- 
> Thanks,
> Khadar
> 


Re: Log4JAppender Ignoring the Logging Pattern. Getting only description.

Posted by khadar basha <kh...@gmail.com>.
Hi All,

Finally i am able to send the logs in log4j format. I have modified the
Log4jAppender.java  to include the formatted message into FlumeEvent's
body.
I verified through programaically.Its working fine. It is Working fine with
log4j.xml.  by using log4j.properties it is not taking the format. Not sure
anything need to be done.

Modified Log4jAppender.java
=====================
package org.apache.flume.clients.log4jappender;

import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;

import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;

import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Layout;
import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.spi.LoggingEvent;

/**
 *
 * Appends Log4j Events to an external Flume client which is decribed by
 * the Log4j configuration file. The appender takes two required parameters:
 *<p>
 *<strong>Hostname</strong> : This is the hostname of the first hop
 *at which Flume (through an AvroSource) is listening for events.
 *</p>
 *<p>
 *<strong>Port</strong> : This the port on the above host where the Flume
 *Source is listening for events.
 *</p>
 *A sample log4j properties file which appends to a source would look like:
 *<pre><p>
 *log4j.appender.out2 = org.apache.flume.clients.log4jappender.Log4jAppender
 *log4j.appender.out2.Port = 25430
 *log4j.appender.out2.Hostname = foobarflumesource.com
 *log4j.logger.org.apache.flume.clients.log4jappender = DEBUG,out2</p></pre>
 *<p><i>Note: Change the last line to the package of the class(es), that
will
 *do the appending.For example if classes from the package
 *com.bar.foo are appending, the last line would be:</i></p>
 *<pre><p>log4j.logger.com.bar.foo = DEBUG,out2</p></pre>
 *
 *
 */
public class Log4jAppender extends AppenderSkeleton {

  private String hostname;
  private int port;
  private RpcClient rpcClient = null;



  /**
   * If this constructor is used programmatically rather than from a log4j
conf
   * you must set the <tt>port</tt> and <tt>hostname</tt> and then call
   * <tt>activateOptions()</tt> before calling <tt>append()</tt>.
   */
  public Log4jAppender(){
  super();
  System.out.println("Inside Constructor"+layout);
  }

  /**
   * Sets the hostname and port. Even if these are passed the
   * <tt>activateOptions()</tt> function must be called before calling
   * <tt>append()</tt>, else <tt>append()</tt> will throw an Exception.
   * @param hostname The first hop where the client should connect to.
   * @param port The port to connect on the host.
   *
   */
  public Log4jAppender(String hostname, int port){
    this.hostname = hostname;
    this.port = port;
    System.out.println("Inside Constructor from "+layout);
  }


  public Log4jAppender(String hostname, int port, Layout layout){
    this.hostname = hostname;
    this.port = port;
    this.layout = layout;
 }

  /**
   * Append the LoggingEvent, to send to the first Flume hop.
   * @param event The LoggingEvent to be appended to the flume.
   * @throws FlumeException if the appender was closed,
   * or the hostname and port were not setup, there was a timeout, or there
   * was a connection error.
   */
  @Override
  public synchronized void append(LoggingEvent event) throws FlumeException{
    //If rpcClient is null, it means either this appender object was never
    //setup by setting hostname and port and then calling activateOptions
    //or this appender object was closed by calling close(), so we throw an
    //exception to show the appender is no longer accessible.
    if(rpcClient == null){
      throw new FlumeException("Cannot Append to Appender!" +
          "Appender either closed or not setup correctly!");
    }

    if(!rpcClient.isActive()){
      reconnect();
    }

    //Client created first time append is called.
    Map<String, String> hdrs = new HashMap<String, String>();
    hdrs.put(Log4jAvroHeaders.LOGGER_NAME.toString(),
event.getLoggerName());
    hdrs.put(Log4jAvroHeaders.TIMESTAMP.toString(),
        String.valueOf(event.getTimeStamp()));

    //To get the level back simply use
    //LoggerEvent.toLevel(hdrs.get(Integer.parseInt(
    //Log4jAvroHeaders.LOG_LEVEL.toString()))
    hdrs.put(Log4jAvroHeaders.LOG_LEVEL.toString(),
        String.valueOf(event.getLevel().toInt()));
    hdrs.put(Log4jAvroHeaders.MESSAGE_ENCODING.toString(), "UTF8");

    StringBuilder body = null;
    if(layout != null){

    body = new StringBuilder(layout.format(event));

        if(layout.ignoresThrowable()){
          String[] s = event.getThrowableStrRep();

          if (s != null) {

            int len = s.length;

            for (int i = 0; i < len; i++) {
              body.append(s[i]);
              body.append('\n');
            }
            body.setLength(body.length()-1);
          }

        }

    }

    if(layout == null) System.out.println("====== Layout is NULL ======");

    Event flumeEvent = EventBuilder.withBody((layout == null) ?
event.getMessage().toString(): body.toString(),
        Charset.forName("UTF8"), hdrs);

    try {
      rpcClient.append(flumeEvent);
    } catch (EventDeliveryException e) {
      String msg = "Flume append() failed.";
      LogLog.error(msg);
      throw new FlumeException(msg + " Exception follows.", e);
    }
  }

  //This function should be synchronized to make sure one thread
  //does not close an appender another thread is using, and hence risking
  //a null pointer exception.
  /**
   * Closes underlying client.
   * If <tt>append()</tt> is called after this function is called,
   * it will throw an exception.
   * @throws FlumeException if errors occur during close
   */
  @Override
  public synchronized void close() throws FlumeException{
    //Any append calls after this will result in an Exception.
    if (rpcClient != null) {
      rpcClient.close();
      rpcClient = null;
    }
  }

  @Override
  public boolean requiresLayout() {
    return false;
  }

  /**
   * Set the first flume hop hostname.
   * @param hostname The first hop where the client should connect to.
   */
  public void setHostname(String hostname){
    this.hostname = hostname;
  }

  /**
   * Set the port on the hostname to connect to.
   * @param port The port to connect on the host.
   */
  public void setPort(int port){
    this.port = port;
  }

  /**
   * Activate the options set using <tt>setPort()</tt>
   * and <tt>setHostname()</tt>
   * @throws FlumeException if the <tt>hostname</tt> and
   *  <tt>port</tt> combination is invalid.
   */
  @Override
  public void activateOptions() throws FlumeException{
    try {
      rpcClient = RpcClientFactory.getDefaultInstance(hostname, port);
    } catch (FlumeException e) {
      String errormsg = "RPC client creation failed! " +
          e.getMessage();
      LogLog.error(errormsg);
      throw e;
    }
  }

  /**
   * Make it easy to reconnect on failure
   * @throws FlumeException
   */
  private void reconnect() throws FlumeException {
    close();
    activateOptions();
  }

}


Program to send the log messages:
==========================
 public void test(){
     try {
            Log4jAppender appender = new Log4jAppender();
                appender.setHostname("flume_agent_host");
                appender.setPort(41414);
                appender.setLayout(new PatternLayout("%d [%c] (%t)
<%X{user} %X{field}> %m"));
             //   appender.setReconnectAttempts(100);

               appender.activateOptions();

                log.addAppender(appender);

                MDC.put("user", "chris");
              //  while (true) {
                    MDC.put("field", UUID.randomUUID().toString());
                    log.info("=====> Hello World");
                    try {
                        throw new Exception("Testing");
                    } catch (Exception e) {
                        log.error("Gone wrong ===>", e);
                    }
                //}
                    System.in.read();
                    System.in.read();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }



Thanks,
Khadar



On Fri, Jul 27, 2012 at 8:55 PM, Ralph Goers <ra...@dslextreme.com>wrote:

> Khadar,
>
> I am not sure if your reply was meant as a response to my comment or just
> as additional information. The Log4j 2 FlumeAppender is not part of Flume.
> See
> http://logging.apache.org/log4j/2.x/manual/appenders.html#FlumeAvroAppender.
> However, Log4j 2 is still waiting for its first release so you will have to
> build it yourself if you want to try it.
>
> Ralph
>
> On Jul 27, 2012, at 6:58 AM, khadar basha wrote:
>
> FlumeLog4jAvroAppender is available as part of 0.94. But its not available
> in 1.2.*Log4jAppender *is from flume-ng-log4jappender-1.2.0.jar
>
> I think it is replace as part of 1.2.
>
> On Fri, Jul 27, 2012 at 6:31 PM, Ralph Goers <rg...@apache.org> wrote:
>
>> You might consider looking at Log4j 2. It has a Flume Appender that
>> records the whole formatted log message in the body. In addition, it will
>> record the MDC fields as well.
>>
>> Sent from my iPad
>>
>> On Jul 27, 2012, at 5:49 AM, khadar basha <kh...@gmail.com> wrote:
>>
>> Hi
>>
>> I am using Flume1.2. Using avro source and hdfs sink. Sending message
>> using the Logger channel from application server. For that i am using the
>> *org.apache.flume.clients.log4jappender.Log4jAppender *in MyApp.
>>
>> But i am getting only body of the message (description). loosing the
>> time, thread, Level information.
>>
>> flume-conf.properties file
>> ==================
>> agent2Test1.sources = seqGenSrc
>> agent2Test1.channels = memoryChannel
>> agent2Test1.sinks = loggerSink
>>
>> # For each one of the sources, the type is defined
>> agent2Test1.sources.seqGenSrc.type = avro
>> agent2Test1.sources.seqGenSrc.bind=localhost
>> agent2Test1.sources.seqGenSrc.port=41414
>>
>> # interceptors for host and date
>> agent2Test1.sources.seqGenSrc.interceptors = time hostInterceptor
>> agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.type =
>> org.apache.flume.interceptor.HostInterceptor$Builder
>> agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader =
>> host
>> agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.useIP = false
>> agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader.preserveExisting
>> = false
>> agent2Test1.sources.seqGenSrc.interceptors.time.type =
>> org.apache.flume.interceptor.TimestampInterceptor$Builder
>>
>> # The channel can be defined as follows.
>> agent2Test1.sources.seqGenSrc.channels = memoryChannel
>>
>> # Each sink's type must be defined
>> agent2Test1.sinks.loggerSink.type = hdfs
>> agent2Test1.sinks.loggerSink.hdfs.path =
>> hdfs://hadoopHost:8020/data/%Y/%m/%d/%{host}/Logs
>>
>> agent2Test1.sinks.loggerSink.hdfs.fileType = DataStream
>>
>> #Specify the channel the sink should use
>> agent2Test1.sinks.loggerSink.channel = memoryChannel
>>
>> # Each channel's type is defined.
>> agent2Test1.channels.memoryChannel.type = memory
>>
>> # Other config values specific to each type of channel(sink or source)
>> # can be defined as well
>> # In this case, it specifies the capacity of the memory channel
>> agent2Test1.channels.memoryChannel.capacity = 1000
>>
>>
>>
>> Sample java program to generate the log message:
>> =====================================
>>
>>
>> package com.test;
>>
>>
>> import org.apache.flume.clients.log4jappender.Log4jAppender;
>> import org.apache.log4j.Logger;
>> import org.apache.log4j.MDC;
>> import org.apache.log4j.PatternLayout;
>>
>> import java.util.UUID;
>>
>>
>> public class Main {
>>     static Logger log = Logger.getLogger(Main.class);
>>
>>     public static void main(String[] args) {
>>         try {
>>          Log4jAppender appender = new Log4jAppender();
>>             appender.setHostname("localhost");
>>             appender.setPort(41414);
>>             appender.setLayout(new PatternLayout("%d [%c] (%t) <%X{user}
>> %X{field}> %m"));
>>          //   appender.setReconnectAttempts(100);
>>
>>            appender.activateOptions();
>>
>>             log.addAppender(appender);
>>
>>             MDC.put("user", "chris");
>>           //  while (true) {
>>                 MDC.put("field", UUID.randomUUID().toString());
>>                 log.info("=====> Hello World");
>>                 try {
>>                     throw new Exception("Testing");
>>                 } catch (Exception e) {
>>                     log.error("Gone wrong ===>", e);
>>                 }
>>             //}
>>                 System.in.read();
>>                 System.in.read();
>>         }
>>         catch (Exception e) {
>>             e.printStackTrace();
>>         }
>>     }
>> }
>>
>>
>>
>> I am missing any config here ?
>>
>>
>>
>> --
>> Thanks,
>> Khadar
>>
>>
>
>
> --
> Thanks,
> Khadar
>
>
>


-- 
Thanks,
Khadar

Re: Log4JAppender Ignoring the Logging Pattern. Getting only description.

Posted by Ralph Goers <ra...@dslextreme.com>.
Khadar,

I am not sure if your reply was meant as a response to my comment or just as additional information. The Log4j 2 FlumeAppender is not part of Flume. See http://logging.apache.org/log4j/2.x/manual/appenders.html#FlumeAvroAppender. However, Log4j 2 is still waiting for its first release so you will have to build it yourself if you want to try it.

Ralph

On Jul 27, 2012, at 6:58 AM, khadar basha wrote:

> FlumeLog4jAvroAppender is available as part of 0.94. But its not available in 1.2.Log4jAppender is from flume-ng-log4jappender-1.2.0.jar
> 
> I think it is replace as part of 1.2.
> 
> On Fri, Jul 27, 2012 at 6:31 PM, Ralph Goers <rg...@apache.org> wrote:
> You might consider looking at Log4j 2. It has a Flume Appender that records the whole formatted log message in the body. In addition, it will record the MDC fields as well.
> 
> Sent from my iPad
> 
> On Jul 27, 2012, at 5:49 AM, khadar basha <kh...@gmail.com> wrote:
> 
>> Hi 
>> 
>> I am using Flume1.2. Using avro source and hdfs sink. Sending message using the Logger channel from application server. For that i am using the org.apache.flume.clients.log4jappender.Log4jAppender in MyApp.
>> 
>> But i am getting only body of the message (description). loosing the time, thread, Level information. 
>> 
>> flume-conf.properties file
>> ==================
>> agent2Test1.sources = seqGenSrc
>> agent2Test1.channels = memoryChannel
>> agent2Test1.sinks = loggerSink
>> 
>> # For each one of the sources, the type is defined
>> agent2Test1.sources.seqGenSrc.type = avro
>> agent2Test1.sources.seqGenSrc.bind=localhost
>> agent2Test1.sources.seqGenSrc.port=41414
>> 
>> # interceptors for host and date
>> agent2Test1.sources.seqGenSrc.interceptors = time hostInterceptor
>> agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.type = org.apache.flume.interceptor.HostInterceptor$Builder
>> agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader = host
>> agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.useIP = false
>> agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader.preserveExisting = false
>> agent2Test1.sources.seqGenSrc.interceptors.time.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
>> 
>> # The channel can be defined as follows.
>> agent2Test1.sources.seqGenSrc.channels = memoryChannel
>> 
>> # Each sink's type must be defined
>> agent2Test1.sinks.loggerSink.type = hdfs
>> agent2Test1.sinks.loggerSink.hdfs.path = hdfs://hadoopHost:8020/data/%Y/%m/%d/%{host}/Logs
>> 
>> agent2Test1.sinks.loggerSink.hdfs.fileType = DataStream
>> 
>> #Specify the channel the sink should use
>> agent2Test1.sinks.loggerSink.channel = memoryChannel
>> 
>> # Each channel's type is defined.
>> agent2Test1.channels.memoryChannel.type = memory
>> 
>> # Other config values specific to each type of channel(sink or source)
>> # can be defined as well
>> # In this case, it specifies the capacity of the memory channel
>> agent2Test1.channels.memoryChannel.capacity = 1000
>> 
>> 
>> 
>> Sample java program to generate the log message:
>> =====================================
>> 
>> 
>> package com.test;
>> 
>> 
>> import org.apache.flume.clients.log4jappender.Log4jAppender;
>> import org.apache.log4j.Logger;
>> import org.apache.log4j.MDC;
>> import org.apache.log4j.PatternLayout;
>> 
>> import java.util.UUID;
>> 
>> 
>> public class Main {
>>     static Logger log = Logger.getLogger(Main.class);
>> 
>>     public static void main(String[] args) {
>>         try {
>>         	Log4jAppender appender = new Log4jAppender();
>>             appender.setHostname("localhost");
>>             appender.setPort(41414);
>>             appender.setLayout(new PatternLayout("%d [%c] (%t) <%X{user} %X{field}> %m"));
>>          //   appender.setReconnectAttempts(100);
>>             
>>            appender.activateOptions();
>> 
>>             log.addAppender(appender);
>> 
>>             MDC.put("user", "chris");
>>           //  while (true) {
>>                 MDC.put("field", UUID.randomUUID().toString());
>>                 log.info("=====> Hello World");
>>                 try {
>>                     throw new Exception("Testing");
>>                 } catch (Exception e) {
>>                     log.error("Gone wrong ===>", e);
>>                 }
>>             //}
>>                 System.in.read();
>>                 System.in.read();
>>         }
>>         catch (Exception e) {
>>             e.printStackTrace();
>>         }
>>     }
>> }
>> 
>> 
>> 
>> I am missing any config here ?
>> 
>> 
>> 
>> -- 
>> Thanks,
>> Khadar
>> 
> 
> 
> 
> -- 
> Thanks,
> Khadar
> 


Re: Log4JAppender Ignoring the Logging Pattern. Getting only description.

Posted by khadar basha <kh...@gmail.com>.
FlumeLog4jAvroAppender is available as part of 0.94. But its not available
in 1.2.*Log4jAppender *is from flume-ng-log4jappender-1.2.0.jar

I think it is replace as part of 1.2.

On Fri, Jul 27, 2012 at 6:31 PM, Ralph Goers <rg...@apache.org> wrote:

> You might consider looking at Log4j 2. It has a Flume Appender that
> records the whole formatted log message in the body. In addition, it will
> record the MDC fields as well.
>
> Sent from my iPad
>
> On Jul 27, 2012, at 5:49 AM, khadar basha <kh...@gmail.com> wrote:
>
> Hi
>
> I am using Flume1.2. Using avro source and hdfs sink. Sending message
> using the Logger channel from application server. For that i am using the
> *org.apache.flume.clients.log4jappender.Log4jAppender *in MyApp.
>
> But i am getting only body of the message (description). loosing the time,
> thread, Level information.
>
> flume-conf.properties file
> ==================
> agent2Test1.sources = seqGenSrc
> agent2Test1.channels = memoryChannel
> agent2Test1.sinks = loggerSink
>
> # For each one of the sources, the type is defined
> agent2Test1.sources.seqGenSrc.type = avro
> agent2Test1.sources.seqGenSrc.bind=localhost
> agent2Test1.sources.seqGenSrc.port=41414
>
> # interceptors for host and date
> agent2Test1.sources.seqGenSrc.interceptors = time hostInterceptor
> agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.type =
> org.apache.flume.interceptor.HostInterceptor$Builder
> agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader =
> host
> agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.useIP = false
> agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader.preserveExisting
> = false
> agent2Test1.sources.seqGenSrc.interceptors.time.type =
> org.apache.flume.interceptor.TimestampInterceptor$Builder
>
> # The channel can be defined as follows.
> agent2Test1.sources.seqGenSrc.channels = memoryChannel
>
> # Each sink's type must be defined
> agent2Test1.sinks.loggerSink.type = hdfs
> agent2Test1.sinks.loggerSink.hdfs.path =
> hdfs://hadoopHost:8020/data/%Y/%m/%d/%{host}/Logs
>
> agent2Test1.sinks.loggerSink.hdfs.fileType = DataStream
>
> #Specify the channel the sink should use
> agent2Test1.sinks.loggerSink.channel = memoryChannel
>
> # Each channel's type is defined.
> agent2Test1.channels.memoryChannel.type = memory
>
> # Other config values specific to each type of channel(sink or source)
> # can be defined as well
> # In this case, it specifies the capacity of the memory channel
> agent2Test1.channels.memoryChannel.capacity = 1000
>
>
>
> Sample java program to generate the log message:
> =====================================
>
>
> package com.test;
>
>
> import org.apache.flume.clients.log4jappender.Log4jAppender;
> import org.apache.log4j.Logger;
> import org.apache.log4j.MDC;
> import org.apache.log4j.PatternLayout;
>
> import java.util.UUID;
>
>
> public class Main {
>     static Logger log = Logger.getLogger(Main.class);
>
>     public static void main(String[] args) {
>         try {
>          Log4jAppender appender = new Log4jAppender();
>             appender.setHostname("localhost");
>             appender.setPort(41414);
>             appender.setLayout(new PatternLayout("%d [%c] (%t) <%X{user}
> %X{field}> %m"));
>          //   appender.setReconnectAttempts(100);
>
>            appender.activateOptions();
>
>             log.addAppender(appender);
>
>             MDC.put("user", "chris");
>           //  while (true) {
>                 MDC.put("field", UUID.randomUUID().toString());
>                 log.info("=====> Hello World");
>                 try {
>                     throw new Exception("Testing");
>                 } catch (Exception e) {
>                     log.error("Gone wrong ===>", e);
>                 }
>             //}
>                 System.in.read();
>                 System.in.read();
>         }
>         catch (Exception e) {
>             e.printStackTrace();
>         }
>     }
> }
>
>
>
> I am missing any config here ?
>
>
>
> --
> Thanks,
> Khadar
>
>


-- 
Thanks,
Khadar

Re: Log4JAppender Ignoring the Logging Pattern. Getting only description.

Posted by Ralph Goers <rg...@apache.org>.
You might consider looking at Log4j 2. It has a Flume Appender that records the whole formatted log message in the body. In addition, it will record the MDC fields as well.

Sent from my iPad

On Jul 27, 2012, at 5:49 AM, khadar basha <kh...@gmail.com> wrote:

> Hi 
> 
> I am using Flume1.2. Using avro source and hdfs sink. Sending message using the Logger channel from application server. For that i am using the org.apache.flume.clients.log4jappender.Log4jAppender in MyApp.
> 
> But i am getting only body of the message (description). loosing the time, thread, Level information. 
> 
> flume-conf.properties file
> ==================
> agent2Test1.sources = seqGenSrc
> agent2Test1.channels = memoryChannel
> agent2Test1.sinks = loggerSink
> 
> # For each one of the sources, the type is defined
> agent2Test1.sources.seqGenSrc.type = avro
> agent2Test1.sources.seqGenSrc.bind=localhost
> agent2Test1.sources.seqGenSrc.port=41414
> 
> # interceptors for host and date
> agent2Test1.sources.seqGenSrc.interceptors = time hostInterceptor
> agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.type = org.apache.flume.interceptor.HostInterceptor$Builder
> agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader = host
> agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.useIP = false
> agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader.preserveExisting = false
> agent2Test1.sources.seqGenSrc.interceptors.time.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
> 
> # The channel can be defined as follows.
> agent2Test1.sources.seqGenSrc.channels = memoryChannel
> 
> # Each sink's type must be defined
> agent2Test1.sinks.loggerSink.type = hdfs
> agent2Test1.sinks.loggerSink.hdfs.path = hdfs://hadoopHost:8020/data/%Y/%m/%d/%{host}/Logs
> 
> agent2Test1.sinks.loggerSink.hdfs.fileType = DataStream
> 
> #Specify the channel the sink should use
> agent2Test1.sinks.loggerSink.channel = memoryChannel
> 
> # Each channel's type is defined.
> agent2Test1.channels.memoryChannel.type = memory
> 
> # Other config values specific to each type of channel(sink or source)
> # can be defined as well
> # In this case, it specifies the capacity of the memory channel
> agent2Test1.channels.memoryChannel.capacity = 1000
> 
> 
> 
> Sample java program to generate the log message:
> =====================================
> 
> 
> package com.test;
> 
> 
> import org.apache.flume.clients.log4jappender.Log4jAppender;
> import org.apache.log4j.Logger;
> import org.apache.log4j.MDC;
> import org.apache.log4j.PatternLayout;
> 
> import java.util.UUID;
> 
> 
> public class Main {
>     static Logger log = Logger.getLogger(Main.class);
> 
>     public static void main(String[] args) {
>         try {
>         	Log4jAppender appender = new Log4jAppender();
>             appender.setHostname("localhost");
>             appender.setPort(41414);
>             appender.setLayout(new PatternLayout("%d [%c] (%t) <%X{user} %X{field}> %m"));
>          //   appender.setReconnectAttempts(100);
>             
>            appender.activateOptions();
> 
>             log.addAppender(appender);
> 
>             MDC.put("user", "chris");
>           //  while (true) {
>                 MDC.put("field", UUID.randomUUID().toString());
>                 log.info("=====> Hello World");
>                 try {
>                     throw new Exception("Testing");
>                 } catch (Exception e) {
>                     log.error("Gone wrong ===>", e);
>                 }
>             //}
>                 System.in.read();
>                 System.in.read();
>         }
>         catch (Exception e) {
>             e.printStackTrace();
>         }
>     }
> }
> 
> 
> 
> I am missing any config here ?
> 
> 
> 
> -- 
> Thanks,
> Khadar
>