You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Flavio Pompermaier <po...@okkam.it> on 2015/03/31 19:07:07 UTC

HBase TableOutputFormat fix (Flink 0.8.1)

Hi Flink devs,
this is my final report about the HBaseOutputFormat problem (with Flink
0.8.1) and I hope you could suggest me the best way to make a PR:

1) The following code produce the error reported below (this should be
fixed in 0.9 right?)
      Job job = Job.getInstance();
  myDataset.output( new HadoopOutputFormat<Text, *Mutation*>(new
*TableOutputFormat*<Text>(), job));

org.apache.flink.api.common.functions.InvalidTypesException: Interfaces and
abstract classes are not valid types: class
org.apache.hadoop.hbase.client.Mutation
at
org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885)
at
org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877)
at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:376)
at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:296)
at
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:224)
at
org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:152)
at
org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:79)
at org.apache.flink.api.java.DataSet.map(DataSet.java:160)

2)  So I created a custom HBaseTableOutputFormat -*see at the end of the
mail-* (that is basically copied from to the HBase TableInputFormat) that
 sets correctly the "mapred.output.dir" param required by the
HadoopOutputFormatBase so I can make it work:
                Job job = Job.getInstance();
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, outputTableName);
HBaseTableOutputFormat<Text> hbaseTOF = new HBaseTableOutputFormat<>();
HadoopOutputFormat<Text, Put> outOF = new
HadoopOutputFormat<>(hbaseTOF, job);
myDataset.output(outOF);

3) However this does still not work unless you call setConf() of
Configurable subclasses in the HadoopOutputFormatBase:

- in the* public void finalizeGlobal(int parallelism) throws IOException*
 method:
....
               * if(this.mapreduceOutputFormat instanceof Configurable){*
* ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);*
* }*
this.fileOutputCommitter = new FileOutputCommitter(new
Path(this.configuration.get("mapred.output.dir")), taskContext);
....
- In the* public void open(int taskNumber, int numTasks) throws IOException*
 method:
....

              *  if(this.mapreduceOutputFormat instanceof Configurable){*
* ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);*
* }*
 try {
this.context =
HadoopUtils.instantiateTaskAttemptContext(this.configuration,
taskAttemptID);
} catch (Exception e) {
throw new RuntimeException(e);
}
....

4) Probably the modifications apported in point 3 should be applied both
for mapreduce and mapred packages..

Thanks in advace,
Flavio



-----------------------------------------------------------------------
this is the HadoopOutputFormatBase.java:
-----------------------------------------------------------------------
import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputCommitter;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

/**
 * Convert Map/Reduce output and write it to an HBase table. The KEY is
ignored
 * while the output value <u>must</u> be either a {@link Put} or a
 * {@link Delete} instance.
 *
 * @param <KEY>  The type of the key. Ignored in this class.
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class HBaseTableOutputFormat<KEY>* extends OutputFormat<KEY, Put>*
implements Configurable {

  private final Log LOG = LogFactory.getLog(HBaseTableOutputFormat.class);

  /** Job parameter that specifies the output table. */
  public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";

  /**
   * Optional job parameter to specify a peer cluster.
   * Used specifying remote cluster when copying between hbase clusters (the
   * source is picked up from <code>hbase-site.xml</code>).
   * @see TableMapReduceUtil#initTableReducerJob(String, Class,
org.apache.hadoop.mapreduce.Job, Class, String, String, String)
   */
  public static final String QUORUM_ADDRESS = "hbase.mapred.output.quorum";

  /** Optional job parameter to specify peer cluster's ZK client port */
  public static final String QUORUM_PORT =
"hbase.mapred.output.quorum.port";

  /** Optional specification of the rs class name of the peer cluster */
  public static final String
      REGION_SERVER_CLASS = "hbase.mapred.output.rs.class";
  /** Optional specification of the rs impl name of the peer cluster */
  public static final String
      REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl";

  /** The configuration. */
  private Configuration conf = null;

  private HTable table;

  /**
   * Writes the reducer output to an HBase table.
   *
   * @param <KEY>  The type of the key.
   */
  protected static class TableRecordWriter<KEY>
  *extends RecordWriter<KEY, Put> *{

    /** The table to write to. */
    private HTable table;

    /**
     * Instantiate a TableRecordWriter with the HBase HClient for writing.
     *
     * @param table  The table to write to.
     */
    public TableRecordWriter(HTable table) {
      this.table = table;
    }

    /**
     * Closes the writer, in this case flush table commits.
     *
     * @param context  The context.
     * @throws IOException When closing the writer fails.
     * @see
org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext)
     */
    @Override
    public void close(TaskAttemptContext context)
    throws IOException {
      table.close();
    }

    /**
     * Writes a key/value pair into the table.
     *
     * @param key  The key.
     * @param value  The value.
     * @throws IOException When writing fails.
     * @see
org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object,
java.lang.Object)
     */
    @Override
    *public void write(KEY key, Put value)*
*    throws IOException {*
*      if (value instanceof Put) this.table.put(new Put((Put)value));*
*//      else if (value instanceof Delete) this.table.delete(new
Delete((Delete)value));*
*      else throw new IOException("Pass a Delete or a Put");*
*    }*
  }

  /**
   * Creates a new record writer.
   *
   * @param context  The current task context.
   * @return The newly created writer instance.
   * @throws IOException When creating the writer fails.
   * @throws InterruptedException When the jobs is cancelled.
   * @see
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext)
   */
  @Override
  public RecordWriter<KEY, *Put*> getRecordWriter(
    TaskAttemptContext context)
  throws IOException, InterruptedException {
    return new TableRecordWriter<KEY>(this.table);
  }

  /**
   * Checks if the output target exists.
   *
   * @param context  The current context.
   * @throws IOException When the check fails.
   * @throws InterruptedException When the job is aborted.
   * @see
org.apache.hadoop.mapreduce.OutputFormat#checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext)
   */
  @Override
  public void checkOutputSpecs(JobContext context) throws IOException,
      InterruptedException {
    // TODO Check if the table exists?

  }

  /**
   * Returns the output committer.
   *
   * @param context  The current context.
   * @return The committer.
   * @throws IOException When creating the committer fails.
   * @throws InterruptedException When the job is aborted.
   * @see
org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter(org.apache.hadoop.mapreduce.TaskAttemptContext)
   */
  @Override
  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
  throws IOException, InterruptedException {
    return new TableOutputCommitter();
  }

  public Configuration getConf() {
    return conf;
  }

  @Override
  public void setConf(Configuration otherConf) {
    this.conf = HBaseConfiguration.create(otherConf);

    String tableName = this.conf.get(OUTPUT_TABLE);
    if(tableName == null || tableName.length() <= 0) {
      throw new IllegalArgumentException("Must specify table name");
    }

    String address = this.conf.get(QUORUM_ADDRESS);
    int zkClientPort = this.conf.getInt(QUORUM_PORT, 0);
    String serverClass = this.conf.get(REGION_SERVER_CLASS);
    String serverImpl = this.conf.get(REGION_SERVER_IMPL);

    try {
      if (address != null) {
        ZKUtil.applyClusterKeyToConf(this.conf, address);
      }
      if (serverClass != null) {
        this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl);
      }
      if (zkClientPort != 0) {
        this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
      }
      this.table = new HTable(this.conf, tableName);
      this.table.setAutoFlush(false, true);
    *  String outDir = FSUtils.getTableDir(FSUtils.getRootDir(conf),
this.table.getName()).toString();*
*      this.conf.set("mapred.output.dir", outDir);*
*      otherConf.set("mapred.output.dir", outDir);*
      LOG.info("Created table instance for "  + tableName);
    } catch(IOException e) {
      LOG.error(e);
      throw new RuntimeException(e);
    }
  }
}

Re: HBase TableOutputFormat fix (Flink 0.8.1)

Posted by Fabian Hueske <fh...@gmail.com>.
As I said before, I think the configure() method of the original
HadoopOutputFormat should be called in the configure() method of the Flink
HadoopOutputFormatBase. Flink calls configure() before open() and
finalizeOnMaster(), so that should work.

Have you checked if that fixes your problem?
If yes, I'd suggest to open a PR with this fix.

Thanks, Fabian

2015-04-01 13:44 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:

> Any feedback about this?
>
> On Tue, Mar 31, 2015 at 7:07 PM, Flavio Pompermaier <po...@okkam.it>
> wrote:
>
> > Hi Flink devs,
> > this is my final report about the HBaseOutputFormat problem (with Flink
> > 0.8.1) and I hope you could suggest me the best way to make a PR:
> >
> > 1) The following code produce the error reported below (this should be
> > fixed in 0.9 right?)
> >       Job job = Job.getInstance();
> >   myDataset.output( new HadoopOutputFormat<Text, *Mutation*>(new
> > *TableOutputFormat*<Text>(), job));
> >
> > org.apache.flink.api.common.functions.InvalidTypesException: Interfaces
> > and abstract classes are not valid types: class
> > org.apache.hadoop.hbase.client.Mutation
> > at
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885)
> > at
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877)
> > at
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:376)
> > at
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:296)
> > at
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:224)
> > at
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:152)
> > at
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:79)
> > at org.apache.flink.api.java.DataSet.map(DataSet.java:160)
> >
> > 2)  So I created a custom HBaseTableOutputFormat -*see at the end of the
> > mail-* (that is basically copied from to the HBase TableInputFormat) that
> >  sets correctly the "mapred.output.dir" param required by the
> > HadoopOutputFormatBase so I can make it work:
> >                 Job job = Job.getInstance();
> > job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE,
> > outputTableName);
> > HBaseTableOutputFormat<Text> hbaseTOF = new HBaseTableOutputFormat<>();
> > HadoopOutputFormat<Text, Put> outOF = new
> > HadoopOutputFormat<>(hbaseTOF, job);
> > myDataset.output(outOF);
> >
> > 3) However this does still not work unless you call setConf() of
> > Configurable subclasses in the HadoopOutputFormatBase:
> >
> > - in the* public void finalizeGlobal(int parallelism) throws IOException*
> >  method:
> > ....
> >                * if(this.mapreduceOutputFormat instanceof Configurable){*
> > *
> ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);*
> > * }*
> > this.fileOutputCommitter = new FileOutputCommitter(new
> > Path(this.configuration.get("mapred.output.dir")), taskContext);
> > ....
> > - In the* public void open(int taskNumber, int numTasks) throws
> > IOException*  method:
> > ....
> >
> >               *  if(this.mapreduceOutputFormat instanceof Configurable){*
> > *
> ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);*
> > * }*
> >  try {
> > this.context =
> > HadoopUtils.instantiateTaskAttemptContext(this.configuration,
> > taskAttemptID);
> > } catch (Exception e) {
> > throw new RuntimeException(e);
> > }
> > ....
> >
> > 4) Probably the modifications apported in point 3 should be applied both
> > for mapreduce and mapred packages..
> >
> > Thanks in advace,
> > Flavio
> >
> >
> >
> > -----------------------------------------------------------------------
> > this is the HadoopOutputFormatBase.java:
> > -----------------------------------------------------------------------
> > import java.io.IOException;
> >
> > import org.apache.commons.logging.Log;
> > import org.apache.commons.logging.LogFactory;
> > import org.apache.hadoop.classification.InterfaceAudience;
> > import org.apache.hadoop.classification.InterfaceStability;
> > import org.apache.hadoop.conf.Configurable;
> > import org.apache.hadoop.conf.Configuration;
> > import org.apache.hadoop.hbase.HBaseConfiguration;
> > import org.apache.hadoop.hbase.HConstants;
> > import org.apache.hadoop.hbase.client.Delete;
> > import org.apache.hadoop.hbase.client.HTable;
> > import org.apache.hadoop.hbase.client.Put;
> > import org.apache.hadoop.hbase.mapreduce.TableOutputCommitter;
> > import org.apache.hadoop.hbase.util.FSUtils;
> > import org.apache.hadoop.hbase.zookeeper.ZKUtil;
> > import org.apache.hadoop.mapreduce.JobContext;
> > import org.apache.hadoop.mapreduce.OutputCommitter;
> > import org.apache.hadoop.mapreduce.OutputFormat;
> > import org.apache.hadoop.mapreduce.RecordWriter;
> > import org.apache.hadoop.mapreduce.TaskAttemptContext;
> >
> > /**
> >  * Convert Map/Reduce output and write it to an HBase table. The KEY is
> > ignored
> >  * while the output value <u>must</u> be either a {@link Put} or a
> >  * {@link Delete} instance.
> >  *
> >  * @param <KEY>  The type of the key. Ignored in this class.
> >  */
> > @InterfaceAudience.Public
> > @InterfaceStability.Stable
> > public class HBaseTableOutputFormat<KEY>* extends OutputFormat<KEY, Put>*
> > implements Configurable {
> >
> >   private final Log LOG =
> LogFactory.getLog(HBaseTableOutputFormat.class);
> >
> >   /** Job parameter that specifies the output table. */
> >   public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
> >
> >   /**
> >    * Optional job parameter to specify a peer cluster.
> >    * Used specifying remote cluster when copying between hbase clusters
> > (the
> >    * source is picked up from <code>hbase-site.xml</code>).
> >    * @see TableMapReduceUtil#initTableReducerJob(String, Class,
> > org.apache.hadoop.mapreduce.Job, Class, String, String, String)
> >    */
> >   public static final String QUORUM_ADDRESS =
> "hbase.mapred.output.quorum";
> >
> >   /** Optional job parameter to specify peer cluster's ZK client port */
> >   public static final String QUORUM_PORT =
> > "hbase.mapred.output.quorum.port";
> >
> >   /** Optional specification of the rs class name of the peer cluster */
> >   public static final String
> >       REGION_SERVER_CLASS = "hbase.mapred.output.rs.class";
> >   /** Optional specification of the rs impl name of the peer cluster */
> >   public static final String
> >       REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl";
> >
> >   /** The configuration. */
> >   private Configuration conf = null;
> >
> >   private HTable table;
> >
> >   /**
> >    * Writes the reducer output to an HBase table.
> >    *
> >    * @param <KEY>  The type of the key.
> >    */
> >   protected static class TableRecordWriter<KEY>
> >   *extends RecordWriter<KEY, Put> *{
> >
> >     /** The table to write to. */
> >     private HTable table;
> >
> >     /**
> >      * Instantiate a TableRecordWriter with the HBase HClient for
> writing.
> >      *
> >      * @param table  The table to write to.
> >      */
> >     public TableRecordWriter(HTable table) {
> >       this.table = table;
> >     }
> >
> >     /**
> >      * Closes the writer, in this case flush table commits.
> >      *
> >      * @param context  The context.
> >      * @throws IOException When closing the writer fails.
> >      * @see
> >
> org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext)
> >      */
> >     @Override
> >     public void close(TaskAttemptContext context)
> >     throws IOException {
> >       table.close();
> >     }
> >
> >     /**
> >      * Writes a key/value pair into the table.
> >      *
> >      * @param key  The key.
> >      * @param value  The value.
> >      * @throws IOException When writing fails.
> >      * @see
> > org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object,
> > java.lang.Object)
> >      */
> >     @Override
> >     *public void write(KEY key, Put value)*
> > *    throws IOException {*
> > *      if (value instanceof Put) this.table.put(new Put((Put)value));*
> > *//      else if (value instanceof Delete) this.table.delete(new
> > Delete((Delete)value));*
> > *      else throw new IOException("Pass a Delete or a Put");*
> > *    }*
> >   }
> >
> >   /**
> >    * Creates a new record writer.
> >    *
> >    * @param context  The current task context.
> >    * @return The newly created writer instance.
> >    * @throws IOException When creating the writer fails.
> >    * @throws InterruptedException When the jobs is cancelled.
> >    * @see
> >
> org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext)
> >    */
> >   @Override
> >   public RecordWriter<KEY, *Put*> getRecordWriter(
> >     TaskAttemptContext context)
> >   throws IOException, InterruptedException {
> >     return new TableRecordWriter<KEY>(this.table);
> >   }
> >
> >   /**
> >    * Checks if the output target exists.
> >    *
> >    * @param context  The current context.
> >    * @throws IOException When the check fails.
> >    * @throws InterruptedException When the job is aborted.
> >    * @see
> >
> org.apache.hadoop.mapreduce.OutputFormat#checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext)
> >    */
> >   @Override
> >   public void checkOutputSpecs(JobContext context) throws IOException,
> >       InterruptedException {
> >     // TODO Check if the table exists?
> >
> >   }
> >
> >   /**
> >    * Returns the output committer.
> >    *
> >    * @param context  The current context.
> >    * @return The committer.
> >    * @throws IOException When creating the committer fails.
> >    * @throws InterruptedException When the job is aborted.
> >    * @see
> >
> org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter(org.apache.hadoop.mapreduce.TaskAttemptContext)
> >    */
> >   @Override
> >   public OutputCommitter getOutputCommitter(TaskAttemptContext context)
> >   throws IOException, InterruptedException {
> >     return new TableOutputCommitter();
> >   }
> >
> >   public Configuration getConf() {
> >     return conf;
> >   }
> >
> >   @Override
> >   public void setConf(Configuration otherConf) {
> >     this.conf = HBaseConfiguration.create(otherConf);
> >
> >     String tableName = this.conf.get(OUTPUT_TABLE);
> >     if(tableName == null || tableName.length() <= 0) {
> >       throw new IllegalArgumentException("Must specify table name");
> >     }
> >
> >     String address = this.conf.get(QUORUM_ADDRESS);
> >     int zkClientPort = this.conf.getInt(QUORUM_PORT, 0);
> >     String serverClass = this.conf.get(REGION_SERVER_CLASS);
> >     String serverImpl = this.conf.get(REGION_SERVER_IMPL);
> >
> >     try {
> >       if (address != null) {
> >         ZKUtil.applyClusterKeyToConf(this.conf, address);
> >       }
> >       if (serverClass != null) {
> >         this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl);
> >       }
> >       if (zkClientPort != 0) {
> >         this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
> >       }
> >       this.table = new HTable(this.conf, tableName);
> >       this.table.setAutoFlush(false, true);
> >     *  String outDir = FSUtils.getTableDir(FSUtils.getRootDir(conf),
> > this.table.getName()).toString();*
> > *      this.conf.set("mapred.output.dir", outDir);*
> > *      otherConf.set("mapred.output.dir", outDir);*
> >       LOG.info("Created table instance for "  + tableName);
> >     } catch(IOException e) {
> >       LOG.error(e);
> >       throw new RuntimeException(e);
> >     }
> >   }
> > }
> >
> >
>

Re: HBase TableOutputFormat fix (Flink 0.8.1)

Posted by Maximilian Michels <mx...@apache.org>.
Just base your changes on the current master.

On Wed, Apr 1, 2015 at 2:12 PM, Flavio Pompermaier <po...@okkam.it>
wrote:

> Ok..I'd like to have this fix in the next release. Should I branch Flink
> 0.8.1 or 0.9 or which version?
>
> On Wed, Apr 1, 2015 at 2:04 PM, Maximilian Michels <mx...@apache.org> wrote:
>
> > Hi Flavio,
> >
> > Thanks for looking into this problem. Actually, it's a bit difficult to
> > discuss your changes here because of the formatting/syntax highlighting
> and
> > missing context of the classes. Usually, we do that in a pull request. Do
> > you have a GitHub account? If so, push your changes to your forked Flink
> > repository. GitHub will then offer you to create a pull request for your
> > modified branch.
> >
> > Let's discuss your changes on GitHub.
> >
> > Best,
> > Max
> >
> > On Wed, Apr 1, 2015 at 1:44 PM, Flavio Pompermaier <pompermaier@okkam.it
> >
> > wrote:
> >
> > > Any feedback about this?
> > >
> > > On Tue, Mar 31, 2015 at 7:07 PM, Flavio Pompermaier <
> > pompermaier@okkam.it>
> > > wrote:
> > >
> > > > Hi Flink devs,
> > > > this is my final report about the HBaseOutputFormat problem (with
> Flink
> > > > 0.8.1) and I hope you could suggest me the best way to make a PR:
> > > >
> > > > 1) The following code produce the error reported below (this should
> be
> > > > fixed in 0.9 right?)
> > > >       Job job = Job.getInstance();
> > > >   myDataset.output( new HadoopOutputFormat<Text, *Mutation*>(new
> > > > *TableOutputFormat*<Text>(), job));
> > > >
> > > > org.apache.flink.api.common.functions.InvalidTypesException:
> Interfaces
> > > > and abstract classes are not valid types: class
> > > > org.apache.hadoop.hbase.client.Mutation
> > > > at
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885)
> > > > at
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877)
> > > > at
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:376)
> > > > at
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:296)
> > > > at
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:224)
> > > > at
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:152)
> > > > at
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:79)
> > > > at org.apache.flink.api.java.DataSet.map(DataSet.java:160)
> > > >
> > > > 2)  So I created a custom HBaseTableOutputFormat -*see at the end of
> > the
> > > > mail-* (that is basically copied from to the HBase TableInputFormat)
> > that
> > > >  sets correctly the "mapred.output.dir" param required by the
> > > > HadoopOutputFormatBase so I can make it work:
> > > >                 Job job = Job.getInstance();
> > > > job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE,
> > > > outputTableName);
> > > > HBaseTableOutputFormat<Text> hbaseTOF = new
> HBaseTableOutputFormat<>();
> > > > HadoopOutputFormat<Text, Put> outOF = new
> > > > HadoopOutputFormat<>(hbaseTOF, job);
> > > > myDataset.output(outOF);
> > > >
> > > > 3) However this does still not work unless you call setConf() of
> > > > Configurable subclasses in the HadoopOutputFormatBase:
> > > >
> > > > - in the* public void finalizeGlobal(int parallelism) throws
> > IOException*
> > > >  method:
> > > > ....
> > > >                * if(this.mapreduceOutputFormat instanceof
> > Configurable){*
> > > > *
> > >
> ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);*
> > > > * }*
> > > > this.fileOutputCommitter = new FileOutputCommitter(new
> > > > Path(this.configuration.get("mapred.output.dir")), taskContext);
> > > > ....
> > > > - In the* public void open(int taskNumber, int numTasks) throws
> > > > IOException*  method:
> > > > ....
> > > >
> > > >               *  if(this.mapreduceOutputFormat instanceof
> > Configurable){*
> > > > *
> > >
> ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);*
> > > > * }*
> > > >  try {
> > > > this.context =
> > > > HadoopUtils.instantiateTaskAttemptContext(this.configuration,
> > > > taskAttemptID);
> > > > } catch (Exception e) {
> > > > throw new RuntimeException(e);
> > > > }
> > > > ....
> > > >
> > > > 4) Probably the modifications apported in point 3 should be applied
> > both
> > > > for mapreduce and mapred packages..
> > > >
> > > > Thanks in advace,
> > > > Flavio
> > > >
> > > >
> > > >
> > > >
> -----------------------------------------------------------------------
> > > > this is the HadoopOutputFormatBase.java:
> > > >
> -----------------------------------------------------------------------
> > > > import java.io.IOException;
> > > >
> > > > import org.apache.commons.logging.Log;
> > > > import org.apache.commons.logging.LogFactory;
> > > > import org.apache.hadoop.classification.InterfaceAudience;
> > > > import org.apache.hadoop.classification.InterfaceStability;
> > > > import org.apache.hadoop.conf.Configurable;
> > > > import org.apache.hadoop.conf.Configuration;
> > > > import org.apache.hadoop.hbase.HBaseConfiguration;
> > > > import org.apache.hadoop.hbase.HConstants;
> > > > import org.apache.hadoop.hbase.client.Delete;
> > > > import org.apache.hadoop.hbase.client.HTable;
> > > > import org.apache.hadoop.hbase.client.Put;
> > > > import org.apache.hadoop.hbase.mapreduce.TableOutputCommitter;
> > > > import org.apache.hadoop.hbase.util.FSUtils;
> > > > import org.apache.hadoop.hbase.zookeeper.ZKUtil;
> > > > import org.apache.hadoop.mapreduce.JobContext;
> > > > import org.apache.hadoop.mapreduce.OutputCommitter;
> > > > import org.apache.hadoop.mapreduce.OutputFormat;
> > > > import org.apache.hadoop.mapreduce.RecordWriter;
> > > > import org.apache.hadoop.mapreduce.TaskAttemptContext;
> > > >
> > > > /**
> > > >  * Convert Map/Reduce output and write it to an HBase table. The KEY
> is
> > > > ignored
> > > >  * while the output value <u>must</u> be either a {@link Put} or a
> > > >  * {@link Delete} instance.
> > > >  *
> > > >  * @param <KEY>  The type of the key. Ignored in this class.
> > > >  */
> > > > @InterfaceAudience.Public
> > > > @InterfaceStability.Stable
> > > > public class HBaseTableOutputFormat<KEY>* extends OutputFormat<KEY,
> > Put>*
> > > > implements Configurable {
> > > >
> > > >   private final Log LOG =
> > > LogFactory.getLog(HBaseTableOutputFormat.class);
> > > >
> > > >   /** Job parameter that specifies the output table. */
> > > >   public static final String OUTPUT_TABLE =
> "hbase.mapred.outputtable";
> > > >
> > > >   /**
> > > >    * Optional job parameter to specify a peer cluster.
> > > >    * Used specifying remote cluster when copying between hbase
> clusters
> > > > (the
> > > >    * source is picked up from <code>hbase-site.xml</code>).
> > > >    * @see TableMapReduceUtil#initTableReducerJob(String, Class,
> > > > org.apache.hadoop.mapreduce.Job, Class, String, String, String)
> > > >    */
> > > >   public static final String QUORUM_ADDRESS =
> > > "hbase.mapred.output.quorum";
> > > >
> > > >   /** Optional job parameter to specify peer cluster's ZK client port
> > */
> > > >   public static final String QUORUM_PORT =
> > > > "hbase.mapred.output.quorum.port";
> > > >
> > > >   /** Optional specification of the rs class name of the peer cluster
> > */
> > > >   public static final String
> > > >       REGION_SERVER_CLASS = "hbase.mapred.output.rs.class";
> > > >   /** Optional specification of the rs impl name of the peer cluster
> */
> > > >   public static final String
> > > >       REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl";
> > > >
> > > >   /** The configuration. */
> > > >   private Configuration conf = null;
> > > >
> > > >   private HTable table;
> > > >
> > > >   /**
> > > >    * Writes the reducer output to an HBase table.
> > > >    *
> > > >    * @param <KEY>  The type of the key.
> > > >    */
> > > >   protected static class TableRecordWriter<KEY>
> > > >   *extends RecordWriter<KEY, Put> *{
> > > >
> > > >     /** The table to write to. */
> > > >     private HTable table;
> > > >
> > > >     /**
> > > >      * Instantiate a TableRecordWriter with the HBase HClient for
> > > writing.
> > > >      *
> > > >      * @param table  The table to write to.
> > > >      */
> > > >     public TableRecordWriter(HTable table) {
> > > >       this.table = table;
> > > >     }
> > > >
> > > >     /**
> > > >      * Closes the writer, in this case flush table commits.
> > > >      *
> > > >      * @param context  The context.
> > > >      * @throws IOException When closing the writer fails.
> > > >      * @see
> > > >
> > >
> >
> org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext)
> > > >      */
> > > >     @Override
> > > >     public void close(TaskAttemptContext context)
> > > >     throws IOException {
> > > >       table.close();
> > > >     }
> > > >
> > > >     /**
> > > >      * Writes a key/value pair into the table.
> > > >      *
> > > >      * @param key  The key.
> > > >      * @param value  The value.
> > > >      * @throws IOException When writing fails.
> > > >      * @see
> > > > org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object,
> > > > java.lang.Object)
> > > >      */
> > > >     @Override
> > > >     *public void write(KEY key, Put value)*
> > > > *    throws IOException {*
> > > > *      if (value instanceof Put) this.table.put(new
> Put((Put)value));*
> > > > *//      else if (value instanceof Delete) this.table.delete(new
> > > > Delete((Delete)value));*
> > > > *      else throw new IOException("Pass a Delete or a Put");*
> > > > *    }*
> > > >   }
> > > >
> > > >   /**
> > > >    * Creates a new record writer.
> > > >    *
> > > >    * @param context  The current task context.
> > > >    * @return The newly created writer instance.
> > > >    * @throws IOException When creating the writer fails.
> > > >    * @throws InterruptedException When the jobs is cancelled.
> > > >    * @see
> > > >
> > >
> >
> org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext)
> > > >    */
> > > >   @Override
> > > >   public RecordWriter<KEY, *Put*> getRecordWriter(
> > > >     TaskAttemptContext context)
> > > >   throws IOException, InterruptedException {
> > > >     return new TableRecordWriter<KEY>(this.table);
> > > >   }
> > > >
> > > >   /**
> > > >    * Checks if the output target exists.
> > > >    *
> > > >    * @param context  The current context.
> > > >    * @throws IOException When the check fails.
> > > >    * @throws InterruptedException When the job is aborted.
> > > >    * @see
> > > >
> > >
> >
> org.apache.hadoop.mapreduce.OutputFormat#checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext)
> > > >    */
> > > >   @Override
> > > >   public void checkOutputSpecs(JobContext context) throws
> IOException,
> > > >       InterruptedException {
> > > >     // TODO Check if the table exists?
> > > >
> > > >   }
> > > >
> > > >   /**
> > > >    * Returns the output committer.
> > > >    *
> > > >    * @param context  The current context.
> > > >    * @return The committer.
> > > >    * @throws IOException When creating the committer fails.
> > > >    * @throws InterruptedException When the job is aborted.
> > > >    * @see
> > > >
> > >
> >
> org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter(org.apache.hadoop.mapreduce.TaskAttemptContext)
> > > >    */
> > > >   @Override
> > > >   public OutputCommitter getOutputCommitter(TaskAttemptContext
> context)
> > > >   throws IOException, InterruptedException {
> > > >     return new TableOutputCommitter();
> > > >   }
> > > >
> > > >   public Configuration getConf() {
> > > >     return conf;
> > > >   }
> > > >
> > > >   @Override
> > > >   public void setConf(Configuration otherConf) {
> > > >     this.conf = HBaseConfiguration.create(otherConf);
> > > >
> > > >     String tableName = this.conf.get(OUTPUT_TABLE);
> > > >     if(tableName == null || tableName.length() <= 0) {
> > > >       throw new IllegalArgumentException("Must specify table name");
> > > >     }
> > > >
> > > >     String address = this.conf.get(QUORUM_ADDRESS);
> > > >     int zkClientPort = this.conf.getInt(QUORUM_PORT, 0);
> > > >     String serverClass = this.conf.get(REGION_SERVER_CLASS);
> > > >     String serverImpl = this.conf.get(REGION_SERVER_IMPL);
> > > >
> > > >     try {
> > > >       if (address != null) {
> > > >         ZKUtil.applyClusterKeyToConf(this.conf, address);
> > > >       }
> > > >       if (serverClass != null) {
> > > >         this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl);
> > > >       }
> > > >       if (zkClientPort != 0) {
> > > >         this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT,
> > zkClientPort);
> > > >       }
> > > >       this.table = new HTable(this.conf, tableName);
> > > >       this.table.setAutoFlush(false, true);
> > > >     *  String outDir = FSUtils.getTableDir(FSUtils.getRootDir(conf),
> > > > this.table.getName()).toString();*
> > > > *      this.conf.set("mapred.output.dir", outDir);*
> > > > *      otherConf.set("mapred.output.dir", outDir);*
> > > >       LOG.info("Created table instance for "  + tableName);
> > > >     } catch(IOException e) {
> > > >       LOG.error(e);
> > > >       throw new RuntimeException(e);
> > > >     }
> > > >   }
> > > > }
> > > >
> > > >
> > >
> >
>

Re: HBase TableOutputFormat fix (Flink 0.8.1)

Posted by Flavio Pompermaier <po...@okkam.it>.
I created a JIRA ticket for this problem (
https://issues.apache.org/jira/browse/FLINK-1828) and I just made a PR that
fix that (thanks Fabian and Robert for the great support!)
Last question: is the "mapred.output.dir" parameter really necessary?
At the end of my job that writes to hbase I found only _SUCCESSn and
._SUCCESS.crc files..

Best.
Flavio

On Sat, Apr 4, 2015 at 12:18 PM, <fh...@gmail.com> wrote:

> Yes, reusing output objects is a good practice but optional. It can help
> to bring down GC overhead.
>
> You could make your function a RichFunction and initialize the output
> object in open().
>
>
>
> Switching function serialization to Kryo is on our TODO list (FLINK-1256).
> Would be good to fix that soon, IMO.
>
>
> Cheers, Fabian
>
>
> From: Flavio Pompermaier
> Sent: ‎Saturday‎, ‎4‎. ‎April‎, ‎2015 ‎11‎:‎23
> To: dev@flink.apache.org
>
>
>
>
>
> Ok graeat!this was not prtfectly clear to me! I'll try that now.
>
> About reuse variable instead..I use it because I saw that this is a common
> practice ib the examples but I'd like to know whether there's a real
> benefit in reusing it for return tuple with respect to returning a brand
> new one each time. Any insight about this?
> On Apr 4, 2015 11:17 AM, "Fabian Hueske" <fh...@gmail.com> wrote:
>
> > User functions are still serialized using Java serialization, not Kryo.
> > Kryo is only used for data exchange at runtime between tasks.
> >
> > If a function such as your MapFunction has a non-serializable member
> > variable, you need to declare it as transient and initialize it before it
> > is executed, e.g., via open() or the first invocation of the functions
> > processing method such as map().
> >
> > 2015-04-04 10:59 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
> >
> > > There's no way to register put with kryo for example?
> > > On Apr 4, 2015 10:06 AM, "Robert Metzger" <rm...@apache.org> wrote:
> > >
> > > > Hey Flavio,
> > > >
> > > > I checked out your "master" branch and started the HBaseWriteExample.
> > > > It started without errors (there were some errors connecting to
> > > Zookeeper,
> > > > but thats probably because I don't have HBase running).
> > > > Am I using the right code (
> > > >
> > > >
> > >
> >
> https://github.com/fpompermaier/flink/commit/c1934da379dba360ad61d18bf921fae08822795a
> > > > )
> > > > to reproduce this error?
> > > > Maybe the error is also happening when the mapper is starting.
> > > >
> > > > Can you try making the following changes to your code?
> > > > https://gist.github.com/rmetzger/a218beca4b0442f3c1f3
> > > > This is basically making the field that contains the non-serializable
> > > "Put"
> > > > element transient.
> > > >
> > > >
> > > >
> > > > On Sat, Apr 4, 2015 at 8:40 AM, Flavio Pompermaier <
> > pompermaier@okkam.it
> > > >
> > > > wrote:
> > > >
> > > > > Any fix for this?
> > > > > On Apr 3, 2015 7:43 AM, "Flavio Pompermaier" <pompermaier@okkam.it
> >
> > > > wrote:
> > > > >
> > > >
> > >
> >
>

Re: HBase TableOutputFormat fix (Flink 0.8.1)

Posted by fh...@gmail.com.
Yes, reusing output objects is a good practice but optional. It can help to bring down GC overhead.

You could make your function a RichFunction and initialize the output object in open(). 



Switching function serialization to Kryo is on our TODO list (FLINK-1256). Would be good to fix that soon, IMO.


Cheers, Fabian


From: Flavio Pompermaier
Sent: ‎Saturday‎, ‎4‎. ‎April‎, ‎2015 ‎11‎:‎23
To: dev@flink.apache.org





Ok graeat!this was not prtfectly clear to me! I'll try that now.

About reuse variable instead..I use it because I saw that this is a common
practice ib the examples but I'd like to know whether there's a real
benefit in reusing it for return tuple with respect to returning a brand
new one each time. Any insight about this?
On Apr 4, 2015 11:17 AM, "Fabian Hueske" <fh...@gmail.com> wrote:

> User functions are still serialized using Java serialization, not Kryo.
> Kryo is only used for data exchange at runtime between tasks.
>
> If a function such as your MapFunction has a non-serializable member
> variable, you need to declare it as transient and initialize it before it
> is executed, e.g., via open() or the first invocation of the functions
> processing method such as map().
>
> 2015-04-04 10:59 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>
> > There's no way to register put with kryo for example?
> > On Apr 4, 2015 10:06 AM, "Robert Metzger" <rm...@apache.org> wrote:
> >
> > > Hey Flavio,
> > >
> > > I checked out your "master" branch and started the HBaseWriteExample.
> > > It started without errors (there were some errors connecting to
> > Zookeeper,
> > > but thats probably because I don't have HBase running).
> > > Am I using the right code (
> > >
> > >
> >
> https://github.com/fpompermaier/flink/commit/c1934da379dba360ad61d18bf921fae08822795a
> > > )
> > > to reproduce this error?
> > > Maybe the error is also happening when the mapper is starting.
> > >
> > > Can you try making the following changes to your code?
> > > https://gist.github.com/rmetzger/a218beca4b0442f3c1f3
> > > This is basically making the field that contains the non-serializable
> > "Put"
> > > element transient.
> > >
> > >
> > >
> > > On Sat, Apr 4, 2015 at 8:40 AM, Flavio Pompermaier <
> pompermaier@okkam.it
> > >
> > > wrote:
> > >
> > > > Any fix for this?
> > > > On Apr 3, 2015 7:43 AM, "Flavio Pompermaier" <po...@okkam.it>
> > > wrote:
> > > >
> > >
> >
>

Re: HBase TableOutputFormat fix (Flink 0.8.1)

Posted by Flavio Pompermaier <po...@okkam.it>.
Ok graeat!this was not prtfectly clear to me! I'll try that now.

About reuse variable instead..I use it because I saw that this is a common
practice ib the examples but I'd like to know whether there's a real
benefit in reusing it for return tuple with respect to returning a brand
new one each time. Any insight about this?
On Apr 4, 2015 11:17 AM, "Fabian Hueske" <fh...@gmail.com> wrote:

> User functions are still serialized using Java serialization, not Kryo.
> Kryo is only used for data exchange at runtime between tasks.
>
> If a function such as your MapFunction has a non-serializable member
> variable, you need to declare it as transient and initialize it before it
> is executed, e.g., via open() or the first invocation of the functions
> processing method such as map().
>
> 2015-04-04 10:59 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>
> > There's no way to register put with kryo for example?
> > On Apr 4, 2015 10:06 AM, "Robert Metzger" <rm...@apache.org> wrote:
> >
> > > Hey Flavio,
> > >
> > > I checked out your "master" branch and started the HBaseWriteExample.
> > > It started without errors (there were some errors connecting to
> > Zookeeper,
> > > but thats probably because I don't have HBase running).
> > > Am I using the right code (
> > >
> > >
> >
> https://github.com/fpompermaier/flink/commit/c1934da379dba360ad61d18bf921fae08822795a
> > > )
> > > to reproduce this error?
> > > Maybe the error is also happening when the mapper is starting.
> > >
> > > Can you try making the following changes to your code?
> > > https://gist.github.com/rmetzger/a218beca4b0442f3c1f3
> > > This is basically making the field that contains the non-serializable
> > "Put"
> > > element transient.
> > >
> > >
> > >
> > > On Sat, Apr 4, 2015 at 8:40 AM, Flavio Pompermaier <
> pompermaier@okkam.it
> > >
> > > wrote:
> > >
> > > > Any fix for this?
> > > > On Apr 3, 2015 7:43 AM, "Flavio Pompermaier" <po...@okkam.it>
> > > wrote:
> > > >
> > >
> >
>

Re: HBase TableOutputFormat fix (Flink 0.8.1)

Posted by Fabian Hueske <fh...@gmail.com>.
User functions are still serialized using Java serialization, not Kryo.
Kryo is only used for data exchange at runtime between tasks.

If a function such as your MapFunction has a non-serializable member
variable, you need to declare it as transient and initialize it before it
is executed, e.g., via open() or the first invocation of the functions
processing method such as map().

2015-04-04 10:59 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:

> There's no way to register put with kryo for example?
> On Apr 4, 2015 10:06 AM, "Robert Metzger" <rm...@apache.org> wrote:
>
> > Hey Flavio,
> >
> > I checked out your "master" branch and started the HBaseWriteExample.
> > It started without errors (there were some errors connecting to
> Zookeeper,
> > but thats probably because I don't have HBase running).
> > Am I using the right code (
> >
> >
> https://github.com/fpompermaier/flink/commit/c1934da379dba360ad61d18bf921fae08822795a
> > )
> > to reproduce this error?
> > Maybe the error is also happening when the mapper is starting.
> >
> > Can you try making the following changes to your code?
> > https://gist.github.com/rmetzger/a218beca4b0442f3c1f3
> > This is basically making the field that contains the non-serializable
> "Put"
> > element transient.
> >
> >
> >
> > On Sat, Apr 4, 2015 at 8:40 AM, Flavio Pompermaier <pompermaier@okkam.it
> >
> > wrote:
> >
> > > Any fix for this?
> > > On Apr 3, 2015 7:43 AM, "Flavio Pompermaier" <po...@okkam.it>
> > wrote:
> > >
> >
>

Re: HBase TableOutputFormat fix (Flink 0.8.1)

Posted by Flavio Pompermaier <po...@okkam.it>.
There's no way to register put with kryo for example?
On Apr 4, 2015 10:06 AM, "Robert Metzger" <rm...@apache.org> wrote:

> Hey Flavio,
>
> I checked out your "master" branch and started the HBaseWriteExample.
> It started without errors (there were some errors connecting to Zookeeper,
> but thats probably because I don't have HBase running).
> Am I using the right code (
>
> https://github.com/fpompermaier/flink/commit/c1934da379dba360ad61d18bf921fae08822795a
> )
> to reproduce this error?
> Maybe the error is also happening when the mapper is starting.
>
> Can you try making the following changes to your code?
> https://gist.github.com/rmetzger/a218beca4b0442f3c1f3
> This is basically making the field that contains the non-serializable "Put"
> element transient.
>
>
>
> On Sat, Apr 4, 2015 at 8:40 AM, Flavio Pompermaier <po...@okkam.it>
> wrote:
>
> > Any fix for this?
> > On Apr 3, 2015 7:43 AM, "Flavio Pompermaier" <po...@okkam.it>
> wrote:
> >
>

Re: HBase TableOutputFormat fix (Flink 0.8.1)

Posted by Robert Metzger <rm...@apache.org>.
Hey Flavio,

I checked out your "master" branch and started the HBaseWriteExample.
It started without errors (there were some errors connecting to Zookeeper,
but thats probably because I don't have HBase running).
Am I using the right code (
https://github.com/fpompermaier/flink/commit/c1934da379dba360ad61d18bf921fae08822795a)
to reproduce this error?
Maybe the error is also happening when the mapper is starting.

Can you try making the following changes to your code?
https://gist.github.com/rmetzger/a218beca4b0442f3c1f3
This is basically making the field that contains the non-serializable "Put"
element transient.



On Sat, Apr 4, 2015 at 8:40 AM, Flavio Pompermaier <po...@okkam.it>
wrote:

> Any fix for this?
> On Apr 3, 2015 7:43 AM, "Flavio Pompermaier" <po...@okkam.it> wrote:
>

Re: HBase TableOutputFormat fix (Flink 0.8.1)

Posted by Flavio Pompermaier <po...@okkam.it>.
Any fix for this?
On Apr 3, 2015 7:43 AM, "Flavio Pompermaier" <po...@okkam.it> wrote:

Re: HBase TableOutputFormat fix (Flink 0.8.1)

Posted by Flavio Pompermaier <po...@okkam.it>.
Which field?the Tuple2?I use it with Flink 0.8.1 without errors
On Apr 3, 2015 2:27 AM, <fh...@gmail.com> wrote:

> If Put is not Serializable it cannot be serialized and shipped.
>
> Is it possible to make that field transient and initialize Put in
> configure()?
>
>
>
>
>
>
> From: Flavio Pompermaier
> Sent: ‎Friday‎, ‎3‎. ‎April‎, ‎2015 ‎01‎:‎42
> To: dev@flink.apache.org
>
>
>
>
>
> Now I made my fork (https://github.com/fpompermaier/flink) but when I run
> the application I get this error:
>
>  java.io.NotSerializableException: org.apache.hadoop.hbase.client.Put
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
> at java.util.ArrayList.writeObject(ArrayList.java:742)
> at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
> at
>
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> at
>
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
> at
>
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:286)
> at
>
> org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:74)
>
> I started from the wordcount example and my code is:
>                         Job job = Job.getInstance();
> job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE,
> outputTableName);
> job.getConfiguration().set("mapred.output.dir","/tmp/test");
> counts.map(new MapFunction<Tuple2<String,Integer>, Tuple2<Text,Mutation>>()
> {
> private final byte[] CF_SOME = Bytes.toBytes("test-column");
> private final byte[] Q_SOME = Bytes.toBytes("value");
> private Tuple2<Text, Mutation> reuse = new Tuple2<Text, Mutation>();
>
> @Override
> public Tuple2<Text, Mutation> map(Tuple2<String, Integer> t) throws
> Exception {
> reuse.f0 = new Text(t.f0);
> Put put = new Put(t.f0.getBytes());
> put.add(CF_SOME, Q_SOME, Bytes.toBytes(t.f1));
> reuse.f1 = put;
> return reuse;
> }
> }).output(new HadoopOutputFormat<Text, Mutation>(new
> TableOutputFormat<Text>(), job));
>
> Do I have to register how to serialize Put somewhere?
>
> On Wed, Apr 1, 2015 at 2:32 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
> > What ever works best for you.
> > We can easily backport or forwardport the patch.
> >
> > 2015-04-01 14:12 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
> >
> > > Ok..I'd like to have this fix in the next release. Should I branch
> Flink
> > > 0.8.1 or 0.9 or which version?
> > >
> > > On Wed, Apr 1, 2015 at 2:04 PM, Maximilian Michels <mx...@apache.org>
> > wrote:
> > >
> > > > Hi Flavio,
> > > >
> > > > Thanks for looking into this problem. Actually, it's a bit difficult
> to
> > > > discuss your changes here because of the formatting/syntax
> highlighting
> > > and
> > > > missing context of the classes. Usually, we do that in a pull
> request.
> > Do
> > > > you have a GitHub account? If so, push your changes to your forked
> > Flink
> > > > repository. GitHub will then offer you to create a pull request for
> > your
> > > > modified branch.
> > > >
> > > > Let's discuss your changes on GitHub.
> > > >
> > > > Best,
> > > > Max
> > > >
> > > > On Wed, Apr 1, 2015 at 1:44 PM, Flavio Pompermaier <
> > pompermaier@okkam.it
> > > >
> > > > wrote:
> > > >
> > > > > Any feedback about this?
> > > > >
> > > > > On Tue, Mar 31, 2015 at 7:07 PM, Flavio Pompermaier <
> > > > pompermaier@okkam.it>
> > > > > wrote:
> > > > >
> > > > > > Hi Flink devs,
> > > > > > this is my final report about the HBaseOutputFormat problem (with
> > > Flink
> > > > > > 0.8.1) and I hope you could suggest me the best way to make a PR:
> > > > > >
> > > > > > 1) The following code produce the error reported below (this
> should
> > > be
> > > > > > fixed in 0.9 right?)
> > > > > >       Job job = Job.getInstance();
> > > > > >   myDataset.output( new HadoopOutputFormat<Text, *Mutation*>(new
> > > > > > *TableOutputFormat*<Text>(), job));
> > > > > >
> > > > > > org.apache.flink.api.common.functions.InvalidTypesException:
> > > Interfaces
> > > > > > and abstract classes are not valid types: class
> > > > > > org.apache.hadoop.hbase.client.Mutation
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:376)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:296)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:224)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:152)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:79)
> > > > > > at org.apache.flink.api.java.DataSet.map(DataSet.java:160)
> > > > > >
> > > > > > 2)  So I created a custom HBaseTableOutputFormat -*see at the end
> > of
> > > > the
> > > > > > mail-* (that is basically copied from to the HBase
> > TableInputFormat)
> > > > that
> > > > > >  sets correctly the "mapred.output.dir" param required by the
> > > > > > HadoopOutputFormatBase so I can make it work:
> > > > > >                 Job job = Job.getInstance();
> > > > > > job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE,
> > > > > > outputTableName);
> > > > > > HBaseTableOutputFormat<Text> hbaseTOF = new
> > > HBaseTableOutputFormat<>();
> > > > > > HadoopOutputFormat<Text, Put> outOF = new
> > > > > > HadoopOutputFormat<>(hbaseTOF, job);
> > > > > > myDataset.output(outOF);
> > > > > >
> > > > > > 3) However this does still not work unless you call setConf() of
> > > > > > Configurable subclasses in the HadoopOutputFormatBase:
> > > > > >
> > > > > > - in the* public void finalizeGlobal(int parallelism) throws
> > > > IOException*
> > > > > >  method:
> > > > > > ....
> > > > > >                * if(this.mapreduceOutputFormat instanceof
> > > > Configurable){*
> > > > > > *
> > > > >
> > >
> ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);*
> > > > > > * }*
> > > > > > this.fileOutputCommitter = new FileOutputCommitter(new
> > > > > > Path(this.configuration.get("mapred.output.dir")), taskContext);
> > > > > > ....
> > > > > > - In the* public void open(int taskNumber, int numTasks) throws
> > > > > > IOException*  method:
> > > > > > ....
> > > > > >
> > > > > >               *  if(this.mapreduceOutputFormat instanceof
> > > > Configurable){*
> > > > > > *
> > > > >
> > >
> ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);*
> > > > > > * }*
> > > > > >  try {
> > > > > > this.context =
> > > > > > HadoopUtils.instantiateTaskAttemptContext(this.configuration,
> > > > > > taskAttemptID);
> > > > > > } catch (Exception e) {
> > > > > > throw new RuntimeException(e);
> > > > > > }
> > > > > > ....
> > > > > >
> > > > > > 4) Probably the modifications apported in point 3 should be
> applied
> > > > both
> > > > > > for mapreduce and mapred packages..
> > > > > >
> > > > > > Thanks in advace,
> > > > > > Flavio
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > -----------------------------------------------------------------------
> > > > > > this is the HadoopOutputFormatBase.java:
> > > > > >
> > > -----------------------------------------------------------------------
> > > > > > import java.io.IOException;
> > > > > >
> > > > > > import org.apache.commons.logging.Log;
> > > > > > import org.apache.commons.logging.LogFactory;
> > > > > > import org.apache.hadoop.classification.InterfaceAudience;
> > > > > > import org.apache.hadoop.classification.InterfaceStability;
> > > > > > import org.apache.hadoop.conf.Configurable;
> > > > > > import org.apache.hadoop.conf.Configuration;
> > > > > > import org.apache.hadoop.hbase.HBaseConfiguration;
> > > > > > import org.apache.hadoop.hbase.HConstants;
> > > > > > import org.apache.hadoop.hbase.client.Delete;
> > > > > > import org.apache.hadoop.hbase.client.HTable;
> > > > > > import org.apache.hadoop.hbase.client.Put;
> > > > > > import org.apache.hadoop.hbase.mapreduce.TableOutputCommitter;
> > > > > > import org.apache.hadoop.hbase.util.FSUtils;
> > > > > > import org.apache.hadoop.hbase.zookeeper.ZKUtil;
> > > > > > import org.apache.hadoop.mapreduce.JobContext;
> > > > > > import org.apache.hadoop.mapreduce.OutputCommitter;
> > > > > > import org.apache.hadoop.mapreduce.OutputFormat;
> > > > > > import org.apache.hadoop.mapreduce.RecordWriter;
> > > > > > import org.apache.hadoop.mapreduce.TaskAttemptContext;
> > > > > >
> > > > > > /**
> > > > > >  * Convert Map/Reduce output and write it to an HBase table. The
> > KEY
> > > is
> > > > > > ignored
> > > > > >  * while the output value <u>must</u> be either a {@link Put} or
> a
> > > > > >  * {@link Delete} instance.
> > > > > >  *
> > > > > >  * @param <KEY>  The type of the key. Ignored in this class.
> > > > > >  */
> > > > > > @InterfaceAudience.Public
> > > > > > @InterfaceStability.Stable
> > > > > > public class HBaseTableOutputFormat<KEY>* extends
> OutputFormat<KEY,
> > > > Put>*
> > > > > > implements Configurable {
> > > > > >
> > > > > >   private final Log LOG =
> > > > > LogFactory.getLog(HBaseTableOutputFormat.class);
> > > > > >
> > > > > >   /** Job parameter that specifies the output table. */
> > > > > >   public static final String OUTPUT_TABLE =
> > > "hbase.mapred.outputtable";
> > > > > >
> > > > > >   /**
> > > > > >    * Optional job parameter to specify a peer cluster.
> > > > > >    * Used specifying remote cluster when copying between hbase
> > > clusters
> > > > > > (the
> > > > > >    * source is picked up from <code>hbase-site.xml</code>).
> > > > > >    * @see TableMapReduceUtil#initTableReducerJob(String, Class,
> > > > > > org.apache.hadoop.mapreduce.Job, Class, String, String, String)
> > > > > >    */
> > > > > >   public static final String QUORUM_ADDRESS =
> > > > > "hbase.mapred.output.quorum";
> > > > > >
> > > > > >   /** Optional job parameter to specify peer cluster's ZK client
> > port
> > > > */
> > > > > >   public static final String QUORUM_PORT =
> > > > > > "hbase.mapred.output.quorum.port";
> > > > > >
> > > > > >   /** Optional specification of the rs class name of the peer
> > cluster
> > > > */
> > > > > >   public static final String
> > > > > >       REGION_SERVER_CLASS = "hbase.mapred.output.rs.class";
> > > > > >   /** Optional specification of the rs impl name of the peer
> > cluster
> > > */
> > > > > >   public static final String
> > > > > >       REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl";
> > > > > >
> > > > > >   /** The configuration. */
> > > > > >   private Configuration conf = null;
> > > > > >
> > > > > >   private HTable table;
> > > > > >
> > > > > >   /**
> > > > > >    * Writes the reducer output to an HBase table.
> > > > > >    *
> > > > > >    * @param <KEY>  The type of the key.
> > > > > >    */
> > > > > >   protected static class TableRecordWriter<KEY>
> > > > > >   *extends RecordWriter<KEY, Put> *{
> > > > > >
> > > > > >     /** The table to write to. */
> > > > > >     private HTable table;
> > > > > >
> > > > > >     /**
> > > > > >      * Instantiate a TableRecordWriter with the HBase HClient for
> > > > > writing.
> > > > > >      *
> > > > > >      * @param table  The table to write to.
> > > > > >      */
> > > > > >     public TableRecordWriter(HTable table) {
> > > > > >       this.table = table;
> > > > > >     }
> > > > > >
> > > > > >     /**
> > > > > >      * Closes the writer, in this case flush table commits.
> > > > > >      *
> > > > > >      * @param context  The context.
> > > > > >      * @throws IOException When closing the writer fails.
> > > > > >      * @see
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext)
> > > > > >      */
> > > > > >     @Override
> > > > > >     public void close(TaskAttemptContext context)
> > > > > >     throws IOException {
> > > > > >       table.close();
> > > > > >     }
> > > > > >
> > > > > >     /**
> > > > > >      * Writes a key/value pair into the table.
> > > > > >      *
> > > > > >      * @param key  The key.
> > > > > >      * @param value  The value.
> > > > > >      * @throws IOException When writing fails.
> > > > > >      * @see
> > > > > > org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object,
> > > > > > java.lang.Object)
> > > > > >      */
> > > > > >     @Override
> > > > > >     *public void write(KEY key, Put value)*
> > > > > > *    throws IOException {*
> > > > > > *      if (value instanceof Put) this.table.put(new
> > > Put((Put)value));*
> > > > > > *//      else if (value instanceof Delete) this.table.delete(new
> > > > > > Delete((Delete)value));*
> > > > > > *      else throw new IOException("Pass a Delete or a Put");*
> > > > > > *    }*
> > > > > >   }
> > > > > >
> > > > > >   /**
> > > > > >    * Creates a new record writer.
> > > > > >    *
> > > > > >    * @param context  The current task context.
> > > > > >    * @return The newly created writer instance.
> > > > > >    * @throws IOException When creating the writer fails.
> > > > > >    * @throws InterruptedException When the jobs is cancelled.
> > > > > >    * @see
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext)
> > > > > >    */
> > > > > >   @Override
> > > > > >   public RecordWriter<KEY, *Put*> getRecordWriter(
> > > > > >     TaskAttemptContext context)
> > > > > >   throws IOException, InterruptedException {
> > > > > >     return new TableRecordWriter<KEY>(this.table);
> > > > > >   }
> > > > > >
> > > > > >   /**
> > > > > >    * Checks if the output target exists.
> > > > > >    *
> > > > > >    * @param context  The current context.
> > > > > >    * @throws IOException When the check fails.
> > > > > >    * @throws InterruptedException When the job is aborted.
> > > > > >    * @see
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.mapreduce.OutputFormat#checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext)
> > > > > >    */
> > > > > >   @Override
> > > > > >   public void checkOutputSpecs(JobContext context) throws
> > > IOException,
> > > > > >       InterruptedException {
> > > > > >     // TODO Check if the table exists?
> > > > > >
> > > > > >   }
> > > > > >
> > > > > >   /**
> > > > > >    * Returns the output committer.
> > > > > >    *
> > > > > >    * @param context  The current context.
> > > > > >    * @return The committer.
> > > > > >    * @throws IOException When creating the committer fails.
> > > > > >    * @throws InterruptedException When the job is aborted.
> > > > > >    * @see
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter(org.apache.hadoop.mapreduce.TaskAttemptContext)
> > > > > >    */
> > > > > >   @Override
> > > > > >   public OutputCommitter getOutputCommitter(TaskAttemptContext
> > > context)
> > > > > >   throws IOException, InterruptedException {
> > > > > >     return new TableOutputCommitter();
> > > > > >   }
> > > > > >
> > > > > >   public Configuration getConf() {
> > > > > >     return conf;
> > > > > >   }
> > > > > >
> > > > > >   @Override
> > > > > >   public void setConf(Configuration otherConf) {
> > > > > >     this.conf = HBaseConfiguration.create(otherConf);
> > > > > >
> > > > > >     String tableName = this.conf.get(OUTPUT_TABLE);
> > > > > >     if(tableName == null || tableName.length() <= 0) {
> > > > > >       throw new IllegalArgumentException("Must specify table
> > name");
> > > > > >     }
> > > > > >
> > > > > >     String address = this.conf.get(QUORUM_ADDRESS);
> > > > > >     int zkClientPort = this.conf.getInt(QUORUM_PORT, 0);
> > > > > >     String serverClass = this.conf.get(REGION_SERVER_CLASS);
> > > > > >     String serverImpl = this.conf.get(REGION_SERVER_IMPL);
> > > > > >
> > > > > >     try {
> > > > > >       if (address != null) {
> > > > > >         ZKUtil.applyClusterKeyToConf(this.conf, address);
> > > > > >       }
> > > > > >       if (serverClass != null) {
> > > > > >         this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl);
> > > > > >       }
> > > > > >       if (zkClientPort != 0) {
> > > > > >         this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT,
> > > > zkClientPort);
> > > > > >       }
> > > > > >       this.table = new HTable(this.conf, tableName);
> > > > > >       this.table.setAutoFlush(false, true);
> > > > > >     *  String outDir =
> > FSUtils.getTableDir(FSUtils.getRootDir(conf),
> > > > > > this.table.getName()).toString();*
> > > > > > *      this.conf.set("mapred.output.dir", outDir);*
> > > > > > *      otherConf.set("mapred.output.dir", outDir);*
> > > > > >       LOG.info("Created table instance for "  + tableName);
> > > > > >     } catch(IOException e) {
> > > > > >       LOG.error(e);
> > > > > >       throw new RuntimeException(e);
> > > > > >     }
> > > > > >   }
> > > > > > }
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >

Re: HBase TableOutputFormat fix (Flink 0.8.1)

Posted by fh...@gmail.com.
If Put is not Serializable it cannot be serialized and shipped.

Is it possible to make that field transient and initialize Put in configure()?






From: Flavio Pompermaier
Sent: ‎Friday‎, ‎3‎. ‎April‎, ‎2015 ‎01‎:‎42
To: dev@flink.apache.org





Now I made my fork (https://github.com/fpompermaier/flink) but when I run
the application I get this error:

 java.io.NotSerializableException: org.apache.hadoop.hbase.client.Put
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at java.util.ArrayList.writeObject(ArrayList.java:742)
at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:286)
at
org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:74)

I started from the wordcount example and my code is:
                        Job job = Job.getInstance();
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, outputTableName);
job.getConfiguration().set("mapred.output.dir","/tmp/test");
counts.map(new MapFunction<Tuple2<String,Integer>, Tuple2<Text,Mutation>>()
{
private final byte[] CF_SOME = Bytes.toBytes("test-column");
private final byte[] Q_SOME = Bytes.toBytes("value");
private Tuple2<Text, Mutation> reuse = new Tuple2<Text, Mutation>();

@Override
public Tuple2<Text, Mutation> map(Tuple2<String, Integer> t) throws
Exception {
reuse.f0 = new Text(t.f0);
Put put = new Put(t.f0.getBytes());
put.add(CF_SOME, Q_SOME, Bytes.toBytes(t.f1));
reuse.f1 = put;
return reuse;
}
}).output(new HadoopOutputFormat<Text, Mutation>(new
TableOutputFormat<Text>(), job));

Do I have to register how to serialize Put somewhere?

On Wed, Apr 1, 2015 at 2:32 PM, Fabian Hueske <fh...@gmail.com> wrote:

> What ever works best for you.
> We can easily backport or forwardport the patch.
>
> 2015-04-01 14:12 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>
> > Ok..I'd like to have this fix in the next release. Should I branch Flink
> > 0.8.1 or 0.9 or which version?
> >
> > On Wed, Apr 1, 2015 at 2:04 PM, Maximilian Michels <mx...@apache.org>
> wrote:
> >
> > > Hi Flavio,
> > >
> > > Thanks for looking into this problem. Actually, it's a bit difficult to
> > > discuss your changes here because of the formatting/syntax highlighting
> > and
> > > missing context of the classes. Usually, we do that in a pull request.
> Do
> > > you have a GitHub account? If so, push your changes to your forked
> Flink
> > > repository. GitHub will then offer you to create a pull request for
> your
> > > modified branch.
> > >
> > > Let's discuss your changes on GitHub.
> > >
> > > Best,
> > > Max
> > >
> > > On Wed, Apr 1, 2015 at 1:44 PM, Flavio Pompermaier <
> pompermaier@okkam.it
> > >
> > > wrote:
> > >
> > > > Any feedback about this?
> > > >
> > > > On Tue, Mar 31, 2015 at 7:07 PM, Flavio Pompermaier <
> > > pompermaier@okkam.it>
> > > > wrote:
> > > >
> > > > > Hi Flink devs,
> > > > > this is my final report about the HBaseOutputFormat problem (with
> > Flink
> > > > > 0.8.1) and I hope you could suggest me the best way to make a PR:
> > > > >
> > > > > 1) The following code produce the error reported below (this should
> > be
> > > > > fixed in 0.9 right?)
> > > > >       Job job = Job.getInstance();
> > > > >   myDataset.output( new HadoopOutputFormat<Text, *Mutation*>(new
> > > > > *TableOutputFormat*<Text>(), job));
> > > > >
> > > > > org.apache.flink.api.common.functions.InvalidTypesException:
> > Interfaces
> > > > > and abstract classes are not valid types: class
> > > > > org.apache.hadoop.hbase.client.Mutation
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:376)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:296)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:224)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:152)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:79)
> > > > > at org.apache.flink.api.java.DataSet.map(DataSet.java:160)
> > > > >
> > > > > 2)  So I created a custom HBaseTableOutputFormat -*see at the end
> of
> > > the
> > > > > mail-* (that is basically copied from to the HBase
> TableInputFormat)
> > > that
> > > > >  sets correctly the "mapred.output.dir" param required by the
> > > > > HadoopOutputFormatBase so I can make it work:
> > > > >                 Job job = Job.getInstance();
> > > > > job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE,
> > > > > outputTableName);
> > > > > HBaseTableOutputFormat<Text> hbaseTOF = new
> > HBaseTableOutputFormat<>();
> > > > > HadoopOutputFormat<Text, Put> outOF = new
> > > > > HadoopOutputFormat<>(hbaseTOF, job);
> > > > > myDataset.output(outOF);
> > > > >
> > > > > 3) However this does still not work unless you call setConf() of
> > > > > Configurable subclasses in the HadoopOutputFormatBase:
> > > > >
> > > > > - in the* public void finalizeGlobal(int parallelism) throws
> > > IOException*
> > > > >  method:
> > > > > ....
> > > > >                * if(this.mapreduceOutputFormat instanceof
> > > Configurable){*
> > > > > *
> > > >
> > ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);*
> > > > > * }*
> > > > > this.fileOutputCommitter = new FileOutputCommitter(new
> > > > > Path(this.configuration.get("mapred.output.dir")), taskContext);
> > > > > ....
> > > > > - In the* public void open(int taskNumber, int numTasks) throws
> > > > > IOException*  method:
> > > > > ....
> > > > >
> > > > >               *  if(this.mapreduceOutputFormat instanceof
> > > Configurable){*
> > > > > *
> > > >
> > ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);*
> > > > > * }*
> > > > >  try {
> > > > > this.context =
> > > > > HadoopUtils.instantiateTaskAttemptContext(this.configuration,
> > > > > taskAttemptID);
> > > > > } catch (Exception e) {
> > > > > throw new RuntimeException(e);
> > > > > }
> > > > > ....
> > > > >
> > > > > 4) Probably the modifications apported in point 3 should be applied
> > > both
> > > > > for mapreduce and mapred packages..
> > > > >
> > > > > Thanks in advace,
> > > > > Flavio
> > > > >
> > > > >
> > > > >
> > > > >
> > -----------------------------------------------------------------------
> > > > > this is the HadoopOutputFormatBase.java:
> > > > >
> > -----------------------------------------------------------------------
> > > > > import java.io.IOException;
> > > > >
> > > > > import org.apache.commons.logging.Log;
> > > > > import org.apache.commons.logging.LogFactory;
> > > > > import org.apache.hadoop.classification.InterfaceAudience;
> > > > > import org.apache.hadoop.classification.InterfaceStability;
> > > > > import org.apache.hadoop.conf.Configurable;
> > > > > import org.apache.hadoop.conf.Configuration;
> > > > > import org.apache.hadoop.hbase.HBaseConfiguration;
> > > > > import org.apache.hadoop.hbase.HConstants;
> > > > > import org.apache.hadoop.hbase.client.Delete;
> > > > > import org.apache.hadoop.hbase.client.HTable;
> > > > > import org.apache.hadoop.hbase.client.Put;
> > > > > import org.apache.hadoop.hbase.mapreduce.TableOutputCommitter;
> > > > > import org.apache.hadoop.hbase.util.FSUtils;
> > > > > import org.apache.hadoop.hbase.zookeeper.ZKUtil;
> > > > > import org.apache.hadoop.mapreduce.JobContext;
> > > > > import org.apache.hadoop.mapreduce.OutputCommitter;
> > > > > import org.apache.hadoop.mapreduce.OutputFormat;
> > > > > import org.apache.hadoop.mapreduce.RecordWriter;
> > > > > import org.apache.hadoop.mapreduce.TaskAttemptContext;
> > > > >
> > > > > /**
> > > > >  * Convert Map/Reduce output and write it to an HBase table. The
> KEY
> > is
> > > > > ignored
> > > > >  * while the output value <u>must</u> be either a {@link Put} or a
> > > > >  * {@link Delete} instance.
> > > > >  *
> > > > >  * @param <KEY>  The type of the key. Ignored in this class.
> > > > >  */
> > > > > @InterfaceAudience.Public
> > > > > @InterfaceStability.Stable
> > > > > public class HBaseTableOutputFormat<KEY>* extends OutputFormat<KEY,
> > > Put>*
> > > > > implements Configurable {
> > > > >
> > > > >   private final Log LOG =
> > > > LogFactory.getLog(HBaseTableOutputFormat.class);
> > > > >
> > > > >   /** Job parameter that specifies the output table. */
> > > > >   public static final String OUTPUT_TABLE =
> > "hbase.mapred.outputtable";
> > > > >
> > > > >   /**
> > > > >    * Optional job parameter to specify a peer cluster.
> > > > >    * Used specifying remote cluster when copying between hbase
> > clusters
> > > > > (the
> > > > >    * source is picked up from <code>hbase-site.xml</code>).
> > > > >    * @see TableMapReduceUtil#initTableReducerJob(String, Class,
> > > > > org.apache.hadoop.mapreduce.Job, Class, String, String, String)
> > > > >    */
> > > > >   public static final String QUORUM_ADDRESS =
> > > > "hbase.mapred.output.quorum";
> > > > >
> > > > >   /** Optional job parameter to specify peer cluster's ZK client
> port
> > > */
> > > > >   public static final String QUORUM_PORT =
> > > > > "hbase.mapred.output.quorum.port";
> > > > >
> > > > >   /** Optional specification of the rs class name of the peer
> cluster
> > > */
> > > > >   public static final String
> > > > >       REGION_SERVER_CLASS = "hbase.mapred.output.rs.class";
> > > > >   /** Optional specification of the rs impl name of the peer
> cluster
> > */
> > > > >   public static final String
> > > > >       REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl";
> > > > >
> > > > >   /** The configuration. */
> > > > >   private Configuration conf = null;
> > > > >
> > > > >   private HTable table;
> > > > >
> > > > >   /**
> > > > >    * Writes the reducer output to an HBase table.
> > > > >    *
> > > > >    * @param <KEY>  The type of the key.
> > > > >    */
> > > > >   protected static class TableRecordWriter<KEY>
> > > > >   *extends RecordWriter<KEY, Put> *{
> > > > >
> > > > >     /** The table to write to. */
> > > > >     private HTable table;
> > > > >
> > > > >     /**
> > > > >      * Instantiate a TableRecordWriter with the HBase HClient for
> > > > writing.
> > > > >      *
> > > > >      * @param table  The table to write to.
> > > > >      */
> > > > >     public TableRecordWriter(HTable table) {
> > > > >       this.table = table;
> > > > >     }
> > > > >
> > > > >     /**
> > > > >      * Closes the writer, in this case flush table commits.
> > > > >      *
> > > > >      * @param context  The context.
> > > > >      * @throws IOException When closing the writer fails.
> > > > >      * @see
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext)
> > > > >      */
> > > > >     @Override
> > > > >     public void close(TaskAttemptContext context)
> > > > >     throws IOException {
> > > > >       table.close();
> > > > >     }
> > > > >
> > > > >     /**
> > > > >      * Writes a key/value pair into the table.
> > > > >      *
> > > > >      * @param key  The key.
> > > > >      * @param value  The value.
> > > > >      * @throws IOException When writing fails.
> > > > >      * @see
> > > > > org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object,
> > > > > java.lang.Object)
> > > > >      */
> > > > >     @Override
> > > > >     *public void write(KEY key, Put value)*
> > > > > *    throws IOException {*
> > > > > *      if (value instanceof Put) this.table.put(new
> > Put((Put)value));*
> > > > > *//      else if (value instanceof Delete) this.table.delete(new
> > > > > Delete((Delete)value));*
> > > > > *      else throw new IOException("Pass a Delete or a Put");*
> > > > > *    }*
> > > > >   }
> > > > >
> > > > >   /**
> > > > >    * Creates a new record writer.
> > > > >    *
> > > > >    * @param context  The current task context.
> > > > >    * @return The newly created writer instance.
> > > > >    * @throws IOException When creating the writer fails.
> > > > >    * @throws InterruptedException When the jobs is cancelled.
> > > > >    * @see
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext)
> > > > >    */
> > > > >   @Override
> > > > >   public RecordWriter<KEY, *Put*> getRecordWriter(
> > > > >     TaskAttemptContext context)
> > > > >   throws IOException, InterruptedException {
> > > > >     return new TableRecordWriter<KEY>(this.table);
> > > > >   }
> > > > >
> > > > >   /**
> > > > >    * Checks if the output target exists.
> > > > >    *
> > > > >    * @param context  The current context.
> > > > >    * @throws IOException When the check fails.
> > > > >    * @throws InterruptedException When the job is aborted.
> > > > >    * @see
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.mapreduce.OutputFormat#checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext)
> > > > >    */
> > > > >   @Override
> > > > >   public void checkOutputSpecs(JobContext context) throws
> > IOException,
> > > > >       InterruptedException {
> > > > >     // TODO Check if the table exists?
> > > > >
> > > > >   }
> > > > >
> > > > >   /**
> > > > >    * Returns the output committer.
> > > > >    *
> > > > >    * @param context  The current context.
> > > > >    * @return The committer.
> > > > >    * @throws IOException When creating the committer fails.
> > > > >    * @throws InterruptedException When the job is aborted.
> > > > >    * @see
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter(org.apache.hadoop.mapreduce.TaskAttemptContext)
> > > > >    */
> > > > >   @Override
> > > > >   public OutputCommitter getOutputCommitter(TaskAttemptContext
> > context)
> > > > >   throws IOException, InterruptedException {
> > > > >     return new TableOutputCommitter();
> > > > >   }
> > > > >
> > > > >   public Configuration getConf() {
> > > > >     return conf;
> > > > >   }
> > > > >
> > > > >   @Override
> > > > >   public void setConf(Configuration otherConf) {
> > > > >     this.conf = HBaseConfiguration.create(otherConf);
> > > > >
> > > > >     String tableName = this.conf.get(OUTPUT_TABLE);
> > > > >     if(tableName == null || tableName.length() <= 0) {
> > > > >       throw new IllegalArgumentException("Must specify table
> name");
> > > > >     }
> > > > >
> > > > >     String address = this.conf.get(QUORUM_ADDRESS);
> > > > >     int zkClientPort = this.conf.getInt(QUORUM_PORT, 0);
> > > > >     String serverClass = this.conf.get(REGION_SERVER_CLASS);
> > > > >     String serverImpl = this.conf.get(REGION_SERVER_IMPL);
> > > > >
> > > > >     try {
> > > > >       if (address != null) {
> > > > >         ZKUtil.applyClusterKeyToConf(this.conf, address);
> > > > >       }
> > > > >       if (serverClass != null) {
> > > > >         this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl);
> > > > >       }
> > > > >       if (zkClientPort != 0) {
> > > > >         this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT,
> > > zkClientPort);
> > > > >       }
> > > > >       this.table = new HTable(this.conf, tableName);
> > > > >       this.table.setAutoFlush(false, true);
> > > > >     *  String outDir =
> FSUtils.getTableDir(FSUtils.getRootDir(conf),
> > > > > this.table.getName()).toString();*
> > > > > *      this.conf.set("mapred.output.dir", outDir);*
> > > > > *      otherConf.set("mapred.output.dir", outDir);*
> > > > >       LOG.info("Created table instance for "  + tableName);
> > > > >     } catch(IOException e) {
> > > > >       LOG.error(e);
> > > > >       throw new RuntimeException(e);
> > > > >     }
> > > > >   }
> > > > > }
> > > > >
> > > > >
> > > >
> > >
> >
>

Re: HBase TableOutputFormat fix (Flink 0.8.1)

Posted by Flavio Pompermaier <po...@okkam.it>.
Now I made my fork (https://github.com/fpompermaier/flink) but when I run
the application I get this error:

 java.io.NotSerializableException: org.apache.hadoop.hbase.client.Put
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at java.util.ArrayList.writeObject(ArrayList.java:742)
at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:286)
at
org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:74)

I started from the wordcount example and my code is:
                        Job job = Job.getInstance();
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, outputTableName);
job.getConfiguration().set("mapred.output.dir","/tmp/test");
counts.map(new MapFunction<Tuple2<String,Integer>, Tuple2<Text,Mutation>>()
{
private final byte[] CF_SOME = Bytes.toBytes("test-column");
private final byte[] Q_SOME = Bytes.toBytes("value");
private Tuple2<Text, Mutation> reuse = new Tuple2<Text, Mutation>();

@Override
public Tuple2<Text, Mutation> map(Tuple2<String, Integer> t) throws
Exception {
reuse.f0 = new Text(t.f0);
Put put = new Put(t.f0.getBytes());
put.add(CF_SOME, Q_SOME, Bytes.toBytes(t.f1));
reuse.f1 = put;
return reuse;
}
}).output(new HadoopOutputFormat<Text, Mutation>(new
TableOutputFormat<Text>(), job));

Do I have to register how to serialize Put somewhere?

On Wed, Apr 1, 2015 at 2:32 PM, Fabian Hueske <fh...@gmail.com> wrote:

> What ever works best for you.
> We can easily backport or forwardport the patch.
>
> 2015-04-01 14:12 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>
> > Ok..I'd like to have this fix in the next release. Should I branch Flink
> > 0.8.1 or 0.9 or which version?
> >
> > On Wed, Apr 1, 2015 at 2:04 PM, Maximilian Michels <mx...@apache.org>
> wrote:
> >
> > > Hi Flavio,
> > >
> > > Thanks for looking into this problem. Actually, it's a bit difficult to
> > > discuss your changes here because of the formatting/syntax highlighting
> > and
> > > missing context of the classes. Usually, we do that in a pull request.
> Do
> > > you have a GitHub account? If so, push your changes to your forked
> Flink
> > > repository. GitHub will then offer you to create a pull request for
> your
> > > modified branch.
> > >
> > > Let's discuss your changes on GitHub.
> > >
> > > Best,
> > > Max
> > >
> > > On Wed, Apr 1, 2015 at 1:44 PM, Flavio Pompermaier <
> pompermaier@okkam.it
> > >
> > > wrote:
> > >
> > > > Any feedback about this?
> > > >
> > > > On Tue, Mar 31, 2015 at 7:07 PM, Flavio Pompermaier <
> > > pompermaier@okkam.it>
> > > > wrote:
> > > >
> > > > > Hi Flink devs,
> > > > > this is my final report about the HBaseOutputFormat problem (with
> > Flink
> > > > > 0.8.1) and I hope you could suggest me the best way to make a PR:
> > > > >
> > > > > 1) The following code produce the error reported below (this should
> > be
> > > > > fixed in 0.9 right?)
> > > > >       Job job = Job.getInstance();
> > > > >   myDataset.output( new HadoopOutputFormat<Text, *Mutation*>(new
> > > > > *TableOutputFormat*<Text>(), job));
> > > > >
> > > > > org.apache.flink.api.common.functions.InvalidTypesException:
> > Interfaces
> > > > > and abstract classes are not valid types: class
> > > > > org.apache.hadoop.hbase.client.Mutation
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:376)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:296)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:224)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:152)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:79)
> > > > > at org.apache.flink.api.java.DataSet.map(DataSet.java:160)
> > > > >
> > > > > 2)  So I created a custom HBaseTableOutputFormat -*see at the end
> of
> > > the
> > > > > mail-* (that is basically copied from to the HBase
> TableInputFormat)
> > > that
> > > > >  sets correctly the "mapred.output.dir" param required by the
> > > > > HadoopOutputFormatBase so I can make it work:
> > > > >                 Job job = Job.getInstance();
> > > > > job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE,
> > > > > outputTableName);
> > > > > HBaseTableOutputFormat<Text> hbaseTOF = new
> > HBaseTableOutputFormat<>();
> > > > > HadoopOutputFormat<Text, Put> outOF = new
> > > > > HadoopOutputFormat<>(hbaseTOF, job);
> > > > > myDataset.output(outOF);
> > > > >
> > > > > 3) However this does still not work unless you call setConf() of
> > > > > Configurable subclasses in the HadoopOutputFormatBase:
> > > > >
> > > > > - in the* public void finalizeGlobal(int parallelism) throws
> > > IOException*
> > > > >  method:
> > > > > ....
> > > > >                * if(this.mapreduceOutputFormat instanceof
> > > Configurable){*
> > > > > *
> > > >
> > ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);*
> > > > > * }*
> > > > > this.fileOutputCommitter = new FileOutputCommitter(new
> > > > > Path(this.configuration.get("mapred.output.dir")), taskContext);
> > > > > ....
> > > > > - In the* public void open(int taskNumber, int numTasks) throws
> > > > > IOException*  method:
> > > > > ....
> > > > >
> > > > >               *  if(this.mapreduceOutputFormat instanceof
> > > Configurable){*
> > > > > *
> > > >
> > ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);*
> > > > > * }*
> > > > >  try {
> > > > > this.context =
> > > > > HadoopUtils.instantiateTaskAttemptContext(this.configuration,
> > > > > taskAttemptID);
> > > > > } catch (Exception e) {
> > > > > throw new RuntimeException(e);
> > > > > }
> > > > > ....
> > > > >
> > > > > 4) Probably the modifications apported in point 3 should be applied
> > > both
> > > > > for mapreduce and mapred packages..
> > > > >
> > > > > Thanks in advace,
> > > > > Flavio
> > > > >
> > > > >
> > > > >
> > > > >
> > -----------------------------------------------------------------------
> > > > > this is the HadoopOutputFormatBase.java:
> > > > >
> > -----------------------------------------------------------------------
> > > > > import java.io.IOException;
> > > > >
> > > > > import org.apache.commons.logging.Log;
> > > > > import org.apache.commons.logging.LogFactory;
> > > > > import org.apache.hadoop.classification.InterfaceAudience;
> > > > > import org.apache.hadoop.classification.InterfaceStability;
> > > > > import org.apache.hadoop.conf.Configurable;
> > > > > import org.apache.hadoop.conf.Configuration;
> > > > > import org.apache.hadoop.hbase.HBaseConfiguration;
> > > > > import org.apache.hadoop.hbase.HConstants;
> > > > > import org.apache.hadoop.hbase.client.Delete;
> > > > > import org.apache.hadoop.hbase.client.HTable;
> > > > > import org.apache.hadoop.hbase.client.Put;
> > > > > import org.apache.hadoop.hbase.mapreduce.TableOutputCommitter;
> > > > > import org.apache.hadoop.hbase.util.FSUtils;
> > > > > import org.apache.hadoop.hbase.zookeeper.ZKUtil;
> > > > > import org.apache.hadoop.mapreduce.JobContext;
> > > > > import org.apache.hadoop.mapreduce.OutputCommitter;
> > > > > import org.apache.hadoop.mapreduce.OutputFormat;
> > > > > import org.apache.hadoop.mapreduce.RecordWriter;
> > > > > import org.apache.hadoop.mapreduce.TaskAttemptContext;
> > > > >
> > > > > /**
> > > > >  * Convert Map/Reduce output and write it to an HBase table. The
> KEY
> > is
> > > > > ignored
> > > > >  * while the output value <u>must</u> be either a {@link Put} or a
> > > > >  * {@link Delete} instance.
> > > > >  *
> > > > >  * @param <KEY>  The type of the key. Ignored in this class.
> > > > >  */
> > > > > @InterfaceAudience.Public
> > > > > @InterfaceStability.Stable
> > > > > public class HBaseTableOutputFormat<KEY>* extends OutputFormat<KEY,
> > > Put>*
> > > > > implements Configurable {
> > > > >
> > > > >   private final Log LOG =
> > > > LogFactory.getLog(HBaseTableOutputFormat.class);
> > > > >
> > > > >   /** Job parameter that specifies the output table. */
> > > > >   public static final String OUTPUT_TABLE =
> > "hbase.mapred.outputtable";
> > > > >
> > > > >   /**
> > > > >    * Optional job parameter to specify a peer cluster.
> > > > >    * Used specifying remote cluster when copying between hbase
> > clusters
> > > > > (the
> > > > >    * source is picked up from <code>hbase-site.xml</code>).
> > > > >    * @see TableMapReduceUtil#initTableReducerJob(String, Class,
> > > > > org.apache.hadoop.mapreduce.Job, Class, String, String, String)
> > > > >    */
> > > > >   public static final String QUORUM_ADDRESS =
> > > > "hbase.mapred.output.quorum";
> > > > >
> > > > >   /** Optional job parameter to specify peer cluster's ZK client
> port
> > > */
> > > > >   public static final String QUORUM_PORT =
> > > > > "hbase.mapred.output.quorum.port";
> > > > >
> > > > >   /** Optional specification of the rs class name of the peer
> cluster
> > > */
> > > > >   public static final String
> > > > >       REGION_SERVER_CLASS = "hbase.mapred.output.rs.class";
> > > > >   /** Optional specification of the rs impl name of the peer
> cluster
> > */
> > > > >   public static final String
> > > > >       REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl";
> > > > >
> > > > >   /** The configuration. */
> > > > >   private Configuration conf = null;
> > > > >
> > > > >   private HTable table;
> > > > >
> > > > >   /**
> > > > >    * Writes the reducer output to an HBase table.
> > > > >    *
> > > > >    * @param <KEY>  The type of the key.
> > > > >    */
> > > > >   protected static class TableRecordWriter<KEY>
> > > > >   *extends RecordWriter<KEY, Put> *{
> > > > >
> > > > >     /** The table to write to. */
> > > > >     private HTable table;
> > > > >
> > > > >     /**
> > > > >      * Instantiate a TableRecordWriter with the HBase HClient for
> > > > writing.
> > > > >      *
> > > > >      * @param table  The table to write to.
> > > > >      */
> > > > >     public TableRecordWriter(HTable table) {
> > > > >       this.table = table;
> > > > >     }
> > > > >
> > > > >     /**
> > > > >      * Closes the writer, in this case flush table commits.
> > > > >      *
> > > > >      * @param context  The context.
> > > > >      * @throws IOException When closing the writer fails.
> > > > >      * @see
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext)
> > > > >      */
> > > > >     @Override
> > > > >     public void close(TaskAttemptContext context)
> > > > >     throws IOException {
> > > > >       table.close();
> > > > >     }
> > > > >
> > > > >     /**
> > > > >      * Writes a key/value pair into the table.
> > > > >      *
> > > > >      * @param key  The key.
> > > > >      * @param value  The value.
> > > > >      * @throws IOException When writing fails.
> > > > >      * @see
> > > > > org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object,
> > > > > java.lang.Object)
> > > > >      */
> > > > >     @Override
> > > > >     *public void write(KEY key, Put value)*
> > > > > *    throws IOException {*
> > > > > *      if (value instanceof Put) this.table.put(new
> > Put((Put)value));*
> > > > > *//      else if (value instanceof Delete) this.table.delete(new
> > > > > Delete((Delete)value));*
> > > > > *      else throw new IOException("Pass a Delete or a Put");*
> > > > > *    }*
> > > > >   }
> > > > >
> > > > >   /**
> > > > >    * Creates a new record writer.
> > > > >    *
> > > > >    * @param context  The current task context.
> > > > >    * @return The newly created writer instance.
> > > > >    * @throws IOException When creating the writer fails.
> > > > >    * @throws InterruptedException When the jobs is cancelled.
> > > > >    * @see
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext)
> > > > >    */
> > > > >   @Override
> > > > >   public RecordWriter<KEY, *Put*> getRecordWriter(
> > > > >     TaskAttemptContext context)
> > > > >   throws IOException, InterruptedException {
> > > > >     return new TableRecordWriter<KEY>(this.table);
> > > > >   }
> > > > >
> > > > >   /**
> > > > >    * Checks if the output target exists.
> > > > >    *
> > > > >    * @param context  The current context.
> > > > >    * @throws IOException When the check fails.
> > > > >    * @throws InterruptedException When the job is aborted.
> > > > >    * @see
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.mapreduce.OutputFormat#checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext)
> > > > >    */
> > > > >   @Override
> > > > >   public void checkOutputSpecs(JobContext context) throws
> > IOException,
> > > > >       InterruptedException {
> > > > >     // TODO Check if the table exists?
> > > > >
> > > > >   }
> > > > >
> > > > >   /**
> > > > >    * Returns the output committer.
> > > > >    *
> > > > >    * @param context  The current context.
> > > > >    * @return The committer.
> > > > >    * @throws IOException When creating the committer fails.
> > > > >    * @throws InterruptedException When the job is aborted.
> > > > >    * @see
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter(org.apache.hadoop.mapreduce.TaskAttemptContext)
> > > > >    */
> > > > >   @Override
> > > > >   public OutputCommitter getOutputCommitter(TaskAttemptContext
> > context)
> > > > >   throws IOException, InterruptedException {
> > > > >     return new TableOutputCommitter();
> > > > >   }
> > > > >
> > > > >   public Configuration getConf() {
> > > > >     return conf;
> > > > >   }
> > > > >
> > > > >   @Override
> > > > >   public void setConf(Configuration otherConf) {
> > > > >     this.conf = HBaseConfiguration.create(otherConf);
> > > > >
> > > > >     String tableName = this.conf.get(OUTPUT_TABLE);
> > > > >     if(tableName == null || tableName.length() <= 0) {
> > > > >       throw new IllegalArgumentException("Must specify table
> name");
> > > > >     }
> > > > >
> > > > >     String address = this.conf.get(QUORUM_ADDRESS);
> > > > >     int zkClientPort = this.conf.getInt(QUORUM_PORT, 0);
> > > > >     String serverClass = this.conf.get(REGION_SERVER_CLASS);
> > > > >     String serverImpl = this.conf.get(REGION_SERVER_IMPL);
> > > > >
> > > > >     try {
> > > > >       if (address != null) {
> > > > >         ZKUtil.applyClusterKeyToConf(this.conf, address);
> > > > >       }
> > > > >       if (serverClass != null) {
> > > > >         this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl);
> > > > >       }
> > > > >       if (zkClientPort != 0) {
> > > > >         this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT,
> > > zkClientPort);
> > > > >       }
> > > > >       this.table = new HTable(this.conf, tableName);
> > > > >       this.table.setAutoFlush(false, true);
> > > > >     *  String outDir =
> FSUtils.getTableDir(FSUtils.getRootDir(conf),
> > > > > this.table.getName()).toString();*
> > > > > *      this.conf.set("mapred.output.dir", outDir);*
> > > > > *      otherConf.set("mapred.output.dir", outDir);*
> > > > >       LOG.info("Created table instance for "  + tableName);
> > > > >     } catch(IOException e) {
> > > > >       LOG.error(e);
> > > > >       throw new RuntimeException(e);
> > > > >     }
> > > > >   }
> > > > > }
> > > > >
> > > > >
> > > >
> > >
> >
>

Re: HBase TableOutputFormat fix (Flink 0.8.1)

Posted by Fabian Hueske <fh...@gmail.com>.
What ever works best for you.
We can easily backport or forwardport the patch.

2015-04-01 14:12 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:

> Ok..I'd like to have this fix in the next release. Should I branch Flink
> 0.8.1 or 0.9 or which version?
>
> On Wed, Apr 1, 2015 at 2:04 PM, Maximilian Michels <mx...@apache.org> wrote:
>
> > Hi Flavio,
> >
> > Thanks for looking into this problem. Actually, it's a bit difficult to
> > discuss your changes here because of the formatting/syntax highlighting
> and
> > missing context of the classes. Usually, we do that in a pull request. Do
> > you have a GitHub account? If so, push your changes to your forked Flink
> > repository. GitHub will then offer you to create a pull request for your
> > modified branch.
> >
> > Let's discuss your changes on GitHub.
> >
> > Best,
> > Max
> >
> > On Wed, Apr 1, 2015 at 1:44 PM, Flavio Pompermaier <pompermaier@okkam.it
> >
> > wrote:
> >
> > > Any feedback about this?
> > >
> > > On Tue, Mar 31, 2015 at 7:07 PM, Flavio Pompermaier <
> > pompermaier@okkam.it>
> > > wrote:
> > >
> > > > Hi Flink devs,
> > > > this is my final report about the HBaseOutputFormat problem (with
> Flink
> > > > 0.8.1) and I hope you could suggest me the best way to make a PR:
> > > >
> > > > 1) The following code produce the error reported below (this should
> be
> > > > fixed in 0.9 right?)
> > > >       Job job = Job.getInstance();
> > > >   myDataset.output( new HadoopOutputFormat<Text, *Mutation*>(new
> > > > *TableOutputFormat*<Text>(), job));
> > > >
> > > > org.apache.flink.api.common.functions.InvalidTypesException:
> Interfaces
> > > > and abstract classes are not valid types: class
> > > > org.apache.hadoop.hbase.client.Mutation
> > > > at
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885)
> > > > at
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877)
> > > > at
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:376)
> > > > at
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:296)
> > > > at
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:224)
> > > > at
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:152)
> > > > at
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:79)
> > > > at org.apache.flink.api.java.DataSet.map(DataSet.java:160)
> > > >
> > > > 2)  So I created a custom HBaseTableOutputFormat -*see at the end of
> > the
> > > > mail-* (that is basically copied from to the HBase TableInputFormat)
> > that
> > > >  sets correctly the "mapred.output.dir" param required by the
> > > > HadoopOutputFormatBase so I can make it work:
> > > >                 Job job = Job.getInstance();
> > > > job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE,
> > > > outputTableName);
> > > > HBaseTableOutputFormat<Text> hbaseTOF = new
> HBaseTableOutputFormat<>();
> > > > HadoopOutputFormat<Text, Put> outOF = new
> > > > HadoopOutputFormat<>(hbaseTOF, job);
> > > > myDataset.output(outOF);
> > > >
> > > > 3) However this does still not work unless you call setConf() of
> > > > Configurable subclasses in the HadoopOutputFormatBase:
> > > >
> > > > - in the* public void finalizeGlobal(int parallelism) throws
> > IOException*
> > > >  method:
> > > > ....
> > > >                * if(this.mapreduceOutputFormat instanceof
> > Configurable){*
> > > > *
> > >
> ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);*
> > > > * }*
> > > > this.fileOutputCommitter = new FileOutputCommitter(new
> > > > Path(this.configuration.get("mapred.output.dir")), taskContext);
> > > > ....
> > > > - In the* public void open(int taskNumber, int numTasks) throws
> > > > IOException*  method:
> > > > ....
> > > >
> > > >               *  if(this.mapreduceOutputFormat instanceof
> > Configurable){*
> > > > *
> > >
> ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);*
> > > > * }*
> > > >  try {
> > > > this.context =
> > > > HadoopUtils.instantiateTaskAttemptContext(this.configuration,
> > > > taskAttemptID);
> > > > } catch (Exception e) {
> > > > throw new RuntimeException(e);
> > > > }
> > > > ....
> > > >
> > > > 4) Probably the modifications apported in point 3 should be applied
> > both
> > > > for mapreduce and mapred packages..
> > > >
> > > > Thanks in advace,
> > > > Flavio
> > > >
> > > >
> > > >
> > > >
> -----------------------------------------------------------------------
> > > > this is the HadoopOutputFormatBase.java:
> > > >
> -----------------------------------------------------------------------
> > > > import java.io.IOException;
> > > >
> > > > import org.apache.commons.logging.Log;
> > > > import org.apache.commons.logging.LogFactory;
> > > > import org.apache.hadoop.classification.InterfaceAudience;
> > > > import org.apache.hadoop.classification.InterfaceStability;
> > > > import org.apache.hadoop.conf.Configurable;
> > > > import org.apache.hadoop.conf.Configuration;
> > > > import org.apache.hadoop.hbase.HBaseConfiguration;
> > > > import org.apache.hadoop.hbase.HConstants;
> > > > import org.apache.hadoop.hbase.client.Delete;
> > > > import org.apache.hadoop.hbase.client.HTable;
> > > > import org.apache.hadoop.hbase.client.Put;
> > > > import org.apache.hadoop.hbase.mapreduce.TableOutputCommitter;
> > > > import org.apache.hadoop.hbase.util.FSUtils;
> > > > import org.apache.hadoop.hbase.zookeeper.ZKUtil;
> > > > import org.apache.hadoop.mapreduce.JobContext;
> > > > import org.apache.hadoop.mapreduce.OutputCommitter;
> > > > import org.apache.hadoop.mapreduce.OutputFormat;
> > > > import org.apache.hadoop.mapreduce.RecordWriter;
> > > > import org.apache.hadoop.mapreduce.TaskAttemptContext;
> > > >
> > > > /**
> > > >  * Convert Map/Reduce output and write it to an HBase table. The KEY
> is
> > > > ignored
> > > >  * while the output value <u>must</u> be either a {@link Put} or a
> > > >  * {@link Delete} instance.
> > > >  *
> > > >  * @param <KEY>  The type of the key. Ignored in this class.
> > > >  */
> > > > @InterfaceAudience.Public
> > > > @InterfaceStability.Stable
> > > > public class HBaseTableOutputFormat<KEY>* extends OutputFormat<KEY,
> > Put>*
> > > > implements Configurable {
> > > >
> > > >   private final Log LOG =
> > > LogFactory.getLog(HBaseTableOutputFormat.class);
> > > >
> > > >   /** Job parameter that specifies the output table. */
> > > >   public static final String OUTPUT_TABLE =
> "hbase.mapred.outputtable";
> > > >
> > > >   /**
> > > >    * Optional job parameter to specify a peer cluster.
> > > >    * Used specifying remote cluster when copying between hbase
> clusters
> > > > (the
> > > >    * source is picked up from <code>hbase-site.xml</code>).
> > > >    * @see TableMapReduceUtil#initTableReducerJob(String, Class,
> > > > org.apache.hadoop.mapreduce.Job, Class, String, String, String)
> > > >    */
> > > >   public static final String QUORUM_ADDRESS =
> > > "hbase.mapred.output.quorum";
> > > >
> > > >   /** Optional job parameter to specify peer cluster's ZK client port
> > */
> > > >   public static final String QUORUM_PORT =
> > > > "hbase.mapred.output.quorum.port";
> > > >
> > > >   /** Optional specification of the rs class name of the peer cluster
> > */
> > > >   public static final String
> > > >       REGION_SERVER_CLASS = "hbase.mapred.output.rs.class";
> > > >   /** Optional specification of the rs impl name of the peer cluster
> */
> > > >   public static final String
> > > >       REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl";
> > > >
> > > >   /** The configuration. */
> > > >   private Configuration conf = null;
> > > >
> > > >   private HTable table;
> > > >
> > > >   /**
> > > >    * Writes the reducer output to an HBase table.
> > > >    *
> > > >    * @param <KEY>  The type of the key.
> > > >    */
> > > >   protected static class TableRecordWriter<KEY>
> > > >   *extends RecordWriter<KEY, Put> *{
> > > >
> > > >     /** The table to write to. */
> > > >     private HTable table;
> > > >
> > > >     /**
> > > >      * Instantiate a TableRecordWriter with the HBase HClient for
> > > writing.
> > > >      *
> > > >      * @param table  The table to write to.
> > > >      */
> > > >     public TableRecordWriter(HTable table) {
> > > >       this.table = table;
> > > >     }
> > > >
> > > >     /**
> > > >      * Closes the writer, in this case flush table commits.
> > > >      *
> > > >      * @param context  The context.
> > > >      * @throws IOException When closing the writer fails.
> > > >      * @see
> > > >
> > >
> >
> org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext)
> > > >      */
> > > >     @Override
> > > >     public void close(TaskAttemptContext context)
> > > >     throws IOException {
> > > >       table.close();
> > > >     }
> > > >
> > > >     /**
> > > >      * Writes a key/value pair into the table.
> > > >      *
> > > >      * @param key  The key.
> > > >      * @param value  The value.
> > > >      * @throws IOException When writing fails.
> > > >      * @see
> > > > org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object,
> > > > java.lang.Object)
> > > >      */
> > > >     @Override
> > > >     *public void write(KEY key, Put value)*
> > > > *    throws IOException {*
> > > > *      if (value instanceof Put) this.table.put(new
> Put((Put)value));*
> > > > *//      else if (value instanceof Delete) this.table.delete(new
> > > > Delete((Delete)value));*
> > > > *      else throw new IOException("Pass a Delete or a Put");*
> > > > *    }*
> > > >   }
> > > >
> > > >   /**
> > > >    * Creates a new record writer.
> > > >    *
> > > >    * @param context  The current task context.
> > > >    * @return The newly created writer instance.
> > > >    * @throws IOException When creating the writer fails.
> > > >    * @throws InterruptedException When the jobs is cancelled.
> > > >    * @see
> > > >
> > >
> >
> org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext)
> > > >    */
> > > >   @Override
> > > >   public RecordWriter<KEY, *Put*> getRecordWriter(
> > > >     TaskAttemptContext context)
> > > >   throws IOException, InterruptedException {
> > > >     return new TableRecordWriter<KEY>(this.table);
> > > >   }
> > > >
> > > >   /**
> > > >    * Checks if the output target exists.
> > > >    *
> > > >    * @param context  The current context.
> > > >    * @throws IOException When the check fails.
> > > >    * @throws InterruptedException When the job is aborted.
> > > >    * @see
> > > >
> > >
> >
> org.apache.hadoop.mapreduce.OutputFormat#checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext)
> > > >    */
> > > >   @Override
> > > >   public void checkOutputSpecs(JobContext context) throws
> IOException,
> > > >       InterruptedException {
> > > >     // TODO Check if the table exists?
> > > >
> > > >   }
> > > >
> > > >   /**
> > > >    * Returns the output committer.
> > > >    *
> > > >    * @param context  The current context.
> > > >    * @return The committer.
> > > >    * @throws IOException When creating the committer fails.
> > > >    * @throws InterruptedException When the job is aborted.
> > > >    * @see
> > > >
> > >
> >
> org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter(org.apache.hadoop.mapreduce.TaskAttemptContext)
> > > >    */
> > > >   @Override
> > > >   public OutputCommitter getOutputCommitter(TaskAttemptContext
> context)
> > > >   throws IOException, InterruptedException {
> > > >     return new TableOutputCommitter();
> > > >   }
> > > >
> > > >   public Configuration getConf() {
> > > >     return conf;
> > > >   }
> > > >
> > > >   @Override
> > > >   public void setConf(Configuration otherConf) {
> > > >     this.conf = HBaseConfiguration.create(otherConf);
> > > >
> > > >     String tableName = this.conf.get(OUTPUT_TABLE);
> > > >     if(tableName == null || tableName.length() <= 0) {
> > > >       throw new IllegalArgumentException("Must specify table name");
> > > >     }
> > > >
> > > >     String address = this.conf.get(QUORUM_ADDRESS);
> > > >     int zkClientPort = this.conf.getInt(QUORUM_PORT, 0);
> > > >     String serverClass = this.conf.get(REGION_SERVER_CLASS);
> > > >     String serverImpl = this.conf.get(REGION_SERVER_IMPL);
> > > >
> > > >     try {
> > > >       if (address != null) {
> > > >         ZKUtil.applyClusterKeyToConf(this.conf, address);
> > > >       }
> > > >       if (serverClass != null) {
> > > >         this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl);
> > > >       }
> > > >       if (zkClientPort != 0) {
> > > >         this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT,
> > zkClientPort);
> > > >       }
> > > >       this.table = new HTable(this.conf, tableName);
> > > >       this.table.setAutoFlush(false, true);
> > > >     *  String outDir = FSUtils.getTableDir(FSUtils.getRootDir(conf),
> > > > this.table.getName()).toString();*
> > > > *      this.conf.set("mapred.output.dir", outDir);*
> > > > *      otherConf.set("mapred.output.dir", outDir);*
> > > >       LOG.info("Created table instance for "  + tableName);
> > > >     } catch(IOException e) {
> > > >       LOG.error(e);
> > > >       throw new RuntimeException(e);
> > > >     }
> > > >   }
> > > > }
> > > >
> > > >
> > >
> >
>

Re: HBase TableOutputFormat fix (Flink 0.8.1)

Posted by Flavio Pompermaier <po...@okkam.it>.
Ok..I'd like to have this fix in the next release. Should I branch Flink
0.8.1 or 0.9 or which version?

On Wed, Apr 1, 2015 at 2:04 PM, Maximilian Michels <mx...@apache.org> wrote:

> Hi Flavio,
>
> Thanks for looking into this problem. Actually, it's a bit difficult to
> discuss your changes here because of the formatting/syntax highlighting and
> missing context of the classes. Usually, we do that in a pull request. Do
> you have a GitHub account? If so, push your changes to your forked Flink
> repository. GitHub will then offer you to create a pull request for your
> modified branch.
>
> Let's discuss your changes on GitHub.
>
> Best,
> Max
>
> On Wed, Apr 1, 2015 at 1:44 PM, Flavio Pompermaier <po...@okkam.it>
> wrote:
>
> > Any feedback about this?
> >
> > On Tue, Mar 31, 2015 at 7:07 PM, Flavio Pompermaier <
> pompermaier@okkam.it>
> > wrote:
> >
> > > Hi Flink devs,
> > > this is my final report about the HBaseOutputFormat problem (with Flink
> > > 0.8.1) and I hope you could suggest me the best way to make a PR:
> > >
> > > 1) The following code produce the error reported below (this should be
> > > fixed in 0.9 right?)
> > >       Job job = Job.getInstance();
> > >   myDataset.output( new HadoopOutputFormat<Text, *Mutation*>(new
> > > *TableOutputFormat*<Text>(), job));
> > >
> > > org.apache.flink.api.common.functions.InvalidTypesException: Interfaces
> > > and abstract classes are not valid types: class
> > > org.apache.hadoop.hbase.client.Mutation
> > > at
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885)
> > > at
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877)
> > > at
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:376)
> > > at
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:296)
> > > at
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:224)
> > > at
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:152)
> > > at
> > >
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:79)
> > > at org.apache.flink.api.java.DataSet.map(DataSet.java:160)
> > >
> > > 2)  So I created a custom HBaseTableOutputFormat -*see at the end of
> the
> > > mail-* (that is basically copied from to the HBase TableInputFormat)
> that
> > >  sets correctly the "mapred.output.dir" param required by the
> > > HadoopOutputFormatBase so I can make it work:
> > >                 Job job = Job.getInstance();
> > > job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE,
> > > outputTableName);
> > > HBaseTableOutputFormat<Text> hbaseTOF = new HBaseTableOutputFormat<>();
> > > HadoopOutputFormat<Text, Put> outOF = new
> > > HadoopOutputFormat<>(hbaseTOF, job);
> > > myDataset.output(outOF);
> > >
> > > 3) However this does still not work unless you call setConf() of
> > > Configurable subclasses in the HadoopOutputFormatBase:
> > >
> > > - in the* public void finalizeGlobal(int parallelism) throws
> IOException*
> > >  method:
> > > ....
> > >                * if(this.mapreduceOutputFormat instanceof
> Configurable){*
> > > *
> > ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);*
> > > * }*
> > > this.fileOutputCommitter = new FileOutputCommitter(new
> > > Path(this.configuration.get("mapred.output.dir")), taskContext);
> > > ....
> > > - In the* public void open(int taskNumber, int numTasks) throws
> > > IOException*  method:
> > > ....
> > >
> > >               *  if(this.mapreduceOutputFormat instanceof
> Configurable){*
> > > *
> > ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);*
> > > * }*
> > >  try {
> > > this.context =
> > > HadoopUtils.instantiateTaskAttemptContext(this.configuration,
> > > taskAttemptID);
> > > } catch (Exception e) {
> > > throw new RuntimeException(e);
> > > }
> > > ....
> > >
> > > 4) Probably the modifications apported in point 3 should be applied
> both
> > > for mapreduce and mapred packages..
> > >
> > > Thanks in advace,
> > > Flavio
> > >
> > >
> > >
> > > -----------------------------------------------------------------------
> > > this is the HadoopOutputFormatBase.java:
> > > -----------------------------------------------------------------------
> > > import java.io.IOException;
> > >
> > > import org.apache.commons.logging.Log;
> > > import org.apache.commons.logging.LogFactory;
> > > import org.apache.hadoop.classification.InterfaceAudience;
> > > import org.apache.hadoop.classification.InterfaceStability;
> > > import org.apache.hadoop.conf.Configurable;
> > > import org.apache.hadoop.conf.Configuration;
> > > import org.apache.hadoop.hbase.HBaseConfiguration;
> > > import org.apache.hadoop.hbase.HConstants;
> > > import org.apache.hadoop.hbase.client.Delete;
> > > import org.apache.hadoop.hbase.client.HTable;
> > > import org.apache.hadoop.hbase.client.Put;
> > > import org.apache.hadoop.hbase.mapreduce.TableOutputCommitter;
> > > import org.apache.hadoop.hbase.util.FSUtils;
> > > import org.apache.hadoop.hbase.zookeeper.ZKUtil;
> > > import org.apache.hadoop.mapreduce.JobContext;
> > > import org.apache.hadoop.mapreduce.OutputCommitter;
> > > import org.apache.hadoop.mapreduce.OutputFormat;
> > > import org.apache.hadoop.mapreduce.RecordWriter;
> > > import org.apache.hadoop.mapreduce.TaskAttemptContext;
> > >
> > > /**
> > >  * Convert Map/Reduce output and write it to an HBase table. The KEY is
> > > ignored
> > >  * while the output value <u>must</u> be either a {@link Put} or a
> > >  * {@link Delete} instance.
> > >  *
> > >  * @param <KEY>  The type of the key. Ignored in this class.
> > >  */
> > > @InterfaceAudience.Public
> > > @InterfaceStability.Stable
> > > public class HBaseTableOutputFormat<KEY>* extends OutputFormat<KEY,
> Put>*
> > > implements Configurable {
> > >
> > >   private final Log LOG =
> > LogFactory.getLog(HBaseTableOutputFormat.class);
> > >
> > >   /** Job parameter that specifies the output table. */
> > >   public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
> > >
> > >   /**
> > >    * Optional job parameter to specify a peer cluster.
> > >    * Used specifying remote cluster when copying between hbase clusters
> > > (the
> > >    * source is picked up from <code>hbase-site.xml</code>).
> > >    * @see TableMapReduceUtil#initTableReducerJob(String, Class,
> > > org.apache.hadoop.mapreduce.Job, Class, String, String, String)
> > >    */
> > >   public static final String QUORUM_ADDRESS =
> > "hbase.mapred.output.quorum";
> > >
> > >   /** Optional job parameter to specify peer cluster's ZK client port
> */
> > >   public static final String QUORUM_PORT =
> > > "hbase.mapred.output.quorum.port";
> > >
> > >   /** Optional specification of the rs class name of the peer cluster
> */
> > >   public static final String
> > >       REGION_SERVER_CLASS = "hbase.mapred.output.rs.class";
> > >   /** Optional specification of the rs impl name of the peer cluster */
> > >   public static final String
> > >       REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl";
> > >
> > >   /** The configuration. */
> > >   private Configuration conf = null;
> > >
> > >   private HTable table;
> > >
> > >   /**
> > >    * Writes the reducer output to an HBase table.
> > >    *
> > >    * @param <KEY>  The type of the key.
> > >    */
> > >   protected static class TableRecordWriter<KEY>
> > >   *extends RecordWriter<KEY, Put> *{
> > >
> > >     /** The table to write to. */
> > >     private HTable table;
> > >
> > >     /**
> > >      * Instantiate a TableRecordWriter with the HBase HClient for
> > writing.
> > >      *
> > >      * @param table  The table to write to.
> > >      */
> > >     public TableRecordWriter(HTable table) {
> > >       this.table = table;
> > >     }
> > >
> > >     /**
> > >      * Closes the writer, in this case flush table commits.
> > >      *
> > >      * @param context  The context.
> > >      * @throws IOException When closing the writer fails.
> > >      * @see
> > >
> >
> org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext)
> > >      */
> > >     @Override
> > >     public void close(TaskAttemptContext context)
> > >     throws IOException {
> > >       table.close();
> > >     }
> > >
> > >     /**
> > >      * Writes a key/value pair into the table.
> > >      *
> > >      * @param key  The key.
> > >      * @param value  The value.
> > >      * @throws IOException When writing fails.
> > >      * @see
> > > org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object,
> > > java.lang.Object)
> > >      */
> > >     @Override
> > >     *public void write(KEY key, Put value)*
> > > *    throws IOException {*
> > > *      if (value instanceof Put) this.table.put(new Put((Put)value));*
> > > *//      else if (value instanceof Delete) this.table.delete(new
> > > Delete((Delete)value));*
> > > *      else throw new IOException("Pass a Delete or a Put");*
> > > *    }*
> > >   }
> > >
> > >   /**
> > >    * Creates a new record writer.
> > >    *
> > >    * @param context  The current task context.
> > >    * @return The newly created writer instance.
> > >    * @throws IOException When creating the writer fails.
> > >    * @throws InterruptedException When the jobs is cancelled.
> > >    * @see
> > >
> >
> org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext)
> > >    */
> > >   @Override
> > >   public RecordWriter<KEY, *Put*> getRecordWriter(
> > >     TaskAttemptContext context)
> > >   throws IOException, InterruptedException {
> > >     return new TableRecordWriter<KEY>(this.table);
> > >   }
> > >
> > >   /**
> > >    * Checks if the output target exists.
> > >    *
> > >    * @param context  The current context.
> > >    * @throws IOException When the check fails.
> > >    * @throws InterruptedException When the job is aborted.
> > >    * @see
> > >
> >
> org.apache.hadoop.mapreduce.OutputFormat#checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext)
> > >    */
> > >   @Override
> > >   public void checkOutputSpecs(JobContext context) throws IOException,
> > >       InterruptedException {
> > >     // TODO Check if the table exists?
> > >
> > >   }
> > >
> > >   /**
> > >    * Returns the output committer.
> > >    *
> > >    * @param context  The current context.
> > >    * @return The committer.
> > >    * @throws IOException When creating the committer fails.
> > >    * @throws InterruptedException When the job is aborted.
> > >    * @see
> > >
> >
> org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter(org.apache.hadoop.mapreduce.TaskAttemptContext)
> > >    */
> > >   @Override
> > >   public OutputCommitter getOutputCommitter(TaskAttemptContext context)
> > >   throws IOException, InterruptedException {
> > >     return new TableOutputCommitter();
> > >   }
> > >
> > >   public Configuration getConf() {
> > >     return conf;
> > >   }
> > >
> > >   @Override
> > >   public void setConf(Configuration otherConf) {
> > >     this.conf = HBaseConfiguration.create(otherConf);
> > >
> > >     String tableName = this.conf.get(OUTPUT_TABLE);
> > >     if(tableName == null || tableName.length() <= 0) {
> > >       throw new IllegalArgumentException("Must specify table name");
> > >     }
> > >
> > >     String address = this.conf.get(QUORUM_ADDRESS);
> > >     int zkClientPort = this.conf.getInt(QUORUM_PORT, 0);
> > >     String serverClass = this.conf.get(REGION_SERVER_CLASS);
> > >     String serverImpl = this.conf.get(REGION_SERVER_IMPL);
> > >
> > >     try {
> > >       if (address != null) {
> > >         ZKUtil.applyClusterKeyToConf(this.conf, address);
> > >       }
> > >       if (serverClass != null) {
> > >         this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl);
> > >       }
> > >       if (zkClientPort != 0) {
> > >         this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT,
> zkClientPort);
> > >       }
> > >       this.table = new HTable(this.conf, tableName);
> > >       this.table.setAutoFlush(false, true);
> > >     *  String outDir = FSUtils.getTableDir(FSUtils.getRootDir(conf),
> > > this.table.getName()).toString();*
> > > *      this.conf.set("mapred.output.dir", outDir);*
> > > *      otherConf.set("mapred.output.dir", outDir);*
> > >       LOG.info("Created table instance for "  + tableName);
> > >     } catch(IOException e) {
> > >       LOG.error(e);
> > >       throw new RuntimeException(e);
> > >     }
> > >   }
> > > }
> > >
> > >
> >
>

Re: HBase TableOutputFormat fix (Flink 0.8.1)

Posted by Maximilian Michels <mx...@apache.org>.
Hi Flavio,

Thanks for looking into this problem. Actually, it's a bit difficult to
discuss your changes here because of the formatting/syntax highlighting and
missing context of the classes. Usually, we do that in a pull request. Do
you have a GitHub account? If so, push your changes to your forked Flink
repository. GitHub will then offer you to create a pull request for your
modified branch.

Let's discuss your changes on GitHub.

Best,
Max

On Wed, Apr 1, 2015 at 1:44 PM, Flavio Pompermaier <po...@okkam.it>
wrote:

> Any feedback about this?
>
> On Tue, Mar 31, 2015 at 7:07 PM, Flavio Pompermaier <po...@okkam.it>
> wrote:
>
> > Hi Flink devs,
> > this is my final report about the HBaseOutputFormat problem (with Flink
> > 0.8.1) and I hope you could suggest me the best way to make a PR:
> >
> > 1) The following code produce the error reported below (this should be
> > fixed in 0.9 right?)
> >       Job job = Job.getInstance();
> >   myDataset.output( new HadoopOutputFormat<Text, *Mutation*>(new
> > *TableOutputFormat*<Text>(), job));
> >
> > org.apache.flink.api.common.functions.InvalidTypesException: Interfaces
> > and abstract classes are not valid types: class
> > org.apache.hadoop.hbase.client.Mutation
> > at
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885)
> > at
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877)
> > at
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:376)
> > at
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:296)
> > at
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:224)
> > at
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:152)
> > at
> >
> org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:79)
> > at org.apache.flink.api.java.DataSet.map(DataSet.java:160)
> >
> > 2)  So I created a custom HBaseTableOutputFormat -*see at the end of the
> > mail-* (that is basically copied from to the HBase TableInputFormat) that
> >  sets correctly the "mapred.output.dir" param required by the
> > HadoopOutputFormatBase so I can make it work:
> >                 Job job = Job.getInstance();
> > job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE,
> > outputTableName);
> > HBaseTableOutputFormat<Text> hbaseTOF = new HBaseTableOutputFormat<>();
> > HadoopOutputFormat<Text, Put> outOF = new
> > HadoopOutputFormat<>(hbaseTOF, job);
> > myDataset.output(outOF);
> >
> > 3) However this does still not work unless you call setConf() of
> > Configurable subclasses in the HadoopOutputFormatBase:
> >
> > - in the* public void finalizeGlobal(int parallelism) throws IOException*
> >  method:
> > ....
> >                * if(this.mapreduceOutputFormat instanceof Configurable){*
> > *
> ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);*
> > * }*
> > this.fileOutputCommitter = new FileOutputCommitter(new
> > Path(this.configuration.get("mapred.output.dir")), taskContext);
> > ....
> > - In the* public void open(int taskNumber, int numTasks) throws
> > IOException*  method:
> > ....
> >
> >               *  if(this.mapreduceOutputFormat instanceof Configurable){*
> > *
> ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);*
> > * }*
> >  try {
> > this.context =
> > HadoopUtils.instantiateTaskAttemptContext(this.configuration,
> > taskAttemptID);
> > } catch (Exception e) {
> > throw new RuntimeException(e);
> > }
> > ....
> >
> > 4) Probably the modifications apported in point 3 should be applied both
> > for mapreduce and mapred packages..
> >
> > Thanks in advace,
> > Flavio
> >
> >
> >
> > -----------------------------------------------------------------------
> > this is the HadoopOutputFormatBase.java:
> > -----------------------------------------------------------------------
> > import java.io.IOException;
> >
> > import org.apache.commons.logging.Log;
> > import org.apache.commons.logging.LogFactory;
> > import org.apache.hadoop.classification.InterfaceAudience;
> > import org.apache.hadoop.classification.InterfaceStability;
> > import org.apache.hadoop.conf.Configurable;
> > import org.apache.hadoop.conf.Configuration;
> > import org.apache.hadoop.hbase.HBaseConfiguration;
> > import org.apache.hadoop.hbase.HConstants;
> > import org.apache.hadoop.hbase.client.Delete;
> > import org.apache.hadoop.hbase.client.HTable;
> > import org.apache.hadoop.hbase.client.Put;
> > import org.apache.hadoop.hbase.mapreduce.TableOutputCommitter;
> > import org.apache.hadoop.hbase.util.FSUtils;
> > import org.apache.hadoop.hbase.zookeeper.ZKUtil;
> > import org.apache.hadoop.mapreduce.JobContext;
> > import org.apache.hadoop.mapreduce.OutputCommitter;
> > import org.apache.hadoop.mapreduce.OutputFormat;
> > import org.apache.hadoop.mapreduce.RecordWriter;
> > import org.apache.hadoop.mapreduce.TaskAttemptContext;
> >
> > /**
> >  * Convert Map/Reduce output and write it to an HBase table. The KEY is
> > ignored
> >  * while the output value <u>must</u> be either a {@link Put} or a
> >  * {@link Delete} instance.
> >  *
> >  * @param <KEY>  The type of the key. Ignored in this class.
> >  */
> > @InterfaceAudience.Public
> > @InterfaceStability.Stable
> > public class HBaseTableOutputFormat<KEY>* extends OutputFormat<KEY, Put>*
> > implements Configurable {
> >
> >   private final Log LOG =
> LogFactory.getLog(HBaseTableOutputFormat.class);
> >
> >   /** Job parameter that specifies the output table. */
> >   public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
> >
> >   /**
> >    * Optional job parameter to specify a peer cluster.
> >    * Used specifying remote cluster when copying between hbase clusters
> > (the
> >    * source is picked up from <code>hbase-site.xml</code>).
> >    * @see TableMapReduceUtil#initTableReducerJob(String, Class,
> > org.apache.hadoop.mapreduce.Job, Class, String, String, String)
> >    */
> >   public static final String QUORUM_ADDRESS =
> "hbase.mapred.output.quorum";
> >
> >   /** Optional job parameter to specify peer cluster's ZK client port */
> >   public static final String QUORUM_PORT =
> > "hbase.mapred.output.quorum.port";
> >
> >   /** Optional specification of the rs class name of the peer cluster */
> >   public static final String
> >       REGION_SERVER_CLASS = "hbase.mapred.output.rs.class";
> >   /** Optional specification of the rs impl name of the peer cluster */
> >   public static final String
> >       REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl";
> >
> >   /** The configuration. */
> >   private Configuration conf = null;
> >
> >   private HTable table;
> >
> >   /**
> >    * Writes the reducer output to an HBase table.
> >    *
> >    * @param <KEY>  The type of the key.
> >    */
> >   protected static class TableRecordWriter<KEY>
> >   *extends RecordWriter<KEY, Put> *{
> >
> >     /** The table to write to. */
> >     private HTable table;
> >
> >     /**
> >      * Instantiate a TableRecordWriter with the HBase HClient for
> writing.
> >      *
> >      * @param table  The table to write to.
> >      */
> >     public TableRecordWriter(HTable table) {
> >       this.table = table;
> >     }
> >
> >     /**
> >      * Closes the writer, in this case flush table commits.
> >      *
> >      * @param context  The context.
> >      * @throws IOException When closing the writer fails.
> >      * @see
> >
> org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext)
> >      */
> >     @Override
> >     public void close(TaskAttemptContext context)
> >     throws IOException {
> >       table.close();
> >     }
> >
> >     /**
> >      * Writes a key/value pair into the table.
> >      *
> >      * @param key  The key.
> >      * @param value  The value.
> >      * @throws IOException When writing fails.
> >      * @see
> > org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object,
> > java.lang.Object)
> >      */
> >     @Override
> >     *public void write(KEY key, Put value)*
> > *    throws IOException {*
> > *      if (value instanceof Put) this.table.put(new Put((Put)value));*
> > *//      else if (value instanceof Delete) this.table.delete(new
> > Delete((Delete)value));*
> > *      else throw new IOException("Pass a Delete or a Put");*
> > *    }*
> >   }
> >
> >   /**
> >    * Creates a new record writer.
> >    *
> >    * @param context  The current task context.
> >    * @return The newly created writer instance.
> >    * @throws IOException When creating the writer fails.
> >    * @throws InterruptedException When the jobs is cancelled.
> >    * @see
> >
> org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext)
> >    */
> >   @Override
> >   public RecordWriter<KEY, *Put*> getRecordWriter(
> >     TaskAttemptContext context)
> >   throws IOException, InterruptedException {
> >     return new TableRecordWriter<KEY>(this.table);
> >   }
> >
> >   /**
> >    * Checks if the output target exists.
> >    *
> >    * @param context  The current context.
> >    * @throws IOException When the check fails.
> >    * @throws InterruptedException When the job is aborted.
> >    * @see
> >
> org.apache.hadoop.mapreduce.OutputFormat#checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext)
> >    */
> >   @Override
> >   public void checkOutputSpecs(JobContext context) throws IOException,
> >       InterruptedException {
> >     // TODO Check if the table exists?
> >
> >   }
> >
> >   /**
> >    * Returns the output committer.
> >    *
> >    * @param context  The current context.
> >    * @return The committer.
> >    * @throws IOException When creating the committer fails.
> >    * @throws InterruptedException When the job is aborted.
> >    * @see
> >
> org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter(org.apache.hadoop.mapreduce.TaskAttemptContext)
> >    */
> >   @Override
> >   public OutputCommitter getOutputCommitter(TaskAttemptContext context)
> >   throws IOException, InterruptedException {
> >     return new TableOutputCommitter();
> >   }
> >
> >   public Configuration getConf() {
> >     return conf;
> >   }
> >
> >   @Override
> >   public void setConf(Configuration otherConf) {
> >     this.conf = HBaseConfiguration.create(otherConf);
> >
> >     String tableName = this.conf.get(OUTPUT_TABLE);
> >     if(tableName == null || tableName.length() <= 0) {
> >       throw new IllegalArgumentException("Must specify table name");
> >     }
> >
> >     String address = this.conf.get(QUORUM_ADDRESS);
> >     int zkClientPort = this.conf.getInt(QUORUM_PORT, 0);
> >     String serverClass = this.conf.get(REGION_SERVER_CLASS);
> >     String serverImpl = this.conf.get(REGION_SERVER_IMPL);
> >
> >     try {
> >       if (address != null) {
> >         ZKUtil.applyClusterKeyToConf(this.conf, address);
> >       }
> >       if (serverClass != null) {
> >         this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl);
> >       }
> >       if (zkClientPort != 0) {
> >         this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
> >       }
> >       this.table = new HTable(this.conf, tableName);
> >       this.table.setAutoFlush(false, true);
> >     *  String outDir = FSUtils.getTableDir(FSUtils.getRootDir(conf),
> > this.table.getName()).toString();*
> > *      this.conf.set("mapred.output.dir", outDir);*
> > *      otherConf.set("mapred.output.dir", outDir);*
> >       LOG.info("Created table instance for "  + tableName);
> >     } catch(IOException e) {
> >       LOG.error(e);
> >       throw new RuntimeException(e);
> >     }
> >   }
> > }
> >
> >
>

Re: HBase TableOutputFormat fix (Flink 0.8.1)

Posted by Flavio Pompermaier <po...@okkam.it>.
Any feedback about this?

On Tue, Mar 31, 2015 at 7:07 PM, Flavio Pompermaier <po...@okkam.it>
wrote:

> Hi Flink devs,
> this is my final report about the HBaseOutputFormat problem (with Flink
> 0.8.1) and I hope you could suggest me the best way to make a PR:
>
> 1) The following code produce the error reported below (this should be
> fixed in 0.9 right?)
>       Job job = Job.getInstance();
>   myDataset.output( new HadoopOutputFormat<Text, *Mutation*>(new
> *TableOutputFormat*<Text>(), job));
>
> org.apache.flink.api.common.functions.InvalidTypesException: Interfaces
> and abstract classes are not valid types: class
> org.apache.hadoop.hbase.client.Mutation
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:376)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:296)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:224)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:152)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:79)
> at org.apache.flink.api.java.DataSet.map(DataSet.java:160)
>
> 2)  So I created a custom HBaseTableOutputFormat -*see at the end of the
> mail-* (that is basically copied from to the HBase TableInputFormat) that
>  sets correctly the "mapred.output.dir" param required by the
> HadoopOutputFormatBase so I can make it work:
>                 Job job = Job.getInstance();
> job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE,
> outputTableName);
> HBaseTableOutputFormat<Text> hbaseTOF = new HBaseTableOutputFormat<>();
> HadoopOutputFormat<Text, Put> outOF = new
> HadoopOutputFormat<>(hbaseTOF, job);
> myDataset.output(outOF);
>
> 3) However this does still not work unless you call setConf() of
> Configurable subclasses in the HadoopOutputFormatBase:
>
> - in the* public void finalizeGlobal(int parallelism) throws IOException*
>  method:
> ....
>                * if(this.mapreduceOutputFormat instanceof Configurable){*
> * ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);*
> * }*
> this.fileOutputCommitter = new FileOutputCommitter(new
> Path(this.configuration.get("mapred.output.dir")), taskContext);
> ....
> - In the* public void open(int taskNumber, int numTasks) throws
> IOException*  method:
> ....
>
>               *  if(this.mapreduceOutputFormat instanceof Configurable){*
> * ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);*
> * }*
>  try {
> this.context =
> HadoopUtils.instantiateTaskAttemptContext(this.configuration,
> taskAttemptID);
> } catch (Exception e) {
> throw new RuntimeException(e);
> }
> ....
>
> 4) Probably the modifications apported in point 3 should be applied both
> for mapreduce and mapred packages..
>
> Thanks in advace,
> Flavio
>
>
>
> -----------------------------------------------------------------------
> this is the HadoopOutputFormatBase.java:
> -----------------------------------------------------------------------
> import java.io.IOException;
>
> import org.apache.commons.logging.Log;
> import org.apache.commons.logging.LogFactory;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configurable;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.hbase.HBaseConfiguration;
> import org.apache.hadoop.hbase.HConstants;
> import org.apache.hadoop.hbase.client.Delete;
> import org.apache.hadoop.hbase.client.HTable;
> import org.apache.hadoop.hbase.client.Put;
> import org.apache.hadoop.hbase.mapreduce.TableOutputCommitter;
> import org.apache.hadoop.hbase.util.FSUtils;
> import org.apache.hadoop.hbase.zookeeper.ZKUtil;
> import org.apache.hadoop.mapreduce.JobContext;
> import org.apache.hadoop.mapreduce.OutputCommitter;
> import org.apache.hadoop.mapreduce.OutputFormat;
> import org.apache.hadoop.mapreduce.RecordWriter;
> import org.apache.hadoop.mapreduce.TaskAttemptContext;
>
> /**
>  * Convert Map/Reduce output and write it to an HBase table. The KEY is
> ignored
>  * while the output value <u>must</u> be either a {@link Put} or a
>  * {@link Delete} instance.
>  *
>  * @param <KEY>  The type of the key. Ignored in this class.
>  */
> @InterfaceAudience.Public
> @InterfaceStability.Stable
> public class HBaseTableOutputFormat<KEY>* extends OutputFormat<KEY, Put>*
> implements Configurable {
>
>   private final Log LOG = LogFactory.getLog(HBaseTableOutputFormat.class);
>
>   /** Job parameter that specifies the output table. */
>   public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
>
>   /**
>    * Optional job parameter to specify a peer cluster.
>    * Used specifying remote cluster when copying between hbase clusters
> (the
>    * source is picked up from <code>hbase-site.xml</code>).
>    * @see TableMapReduceUtil#initTableReducerJob(String, Class,
> org.apache.hadoop.mapreduce.Job, Class, String, String, String)
>    */
>   public static final String QUORUM_ADDRESS = "hbase.mapred.output.quorum";
>
>   /** Optional job parameter to specify peer cluster's ZK client port */
>   public static final String QUORUM_PORT =
> "hbase.mapred.output.quorum.port";
>
>   /** Optional specification of the rs class name of the peer cluster */
>   public static final String
>       REGION_SERVER_CLASS = "hbase.mapred.output.rs.class";
>   /** Optional specification of the rs impl name of the peer cluster */
>   public static final String
>       REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl";
>
>   /** The configuration. */
>   private Configuration conf = null;
>
>   private HTable table;
>
>   /**
>    * Writes the reducer output to an HBase table.
>    *
>    * @param <KEY>  The type of the key.
>    */
>   protected static class TableRecordWriter<KEY>
>   *extends RecordWriter<KEY, Put> *{
>
>     /** The table to write to. */
>     private HTable table;
>
>     /**
>      * Instantiate a TableRecordWriter with the HBase HClient for writing.
>      *
>      * @param table  The table to write to.
>      */
>     public TableRecordWriter(HTable table) {
>       this.table = table;
>     }
>
>     /**
>      * Closes the writer, in this case flush table commits.
>      *
>      * @param context  The context.
>      * @throws IOException When closing the writer fails.
>      * @see
> org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext)
>      */
>     @Override
>     public void close(TaskAttemptContext context)
>     throws IOException {
>       table.close();
>     }
>
>     /**
>      * Writes a key/value pair into the table.
>      *
>      * @param key  The key.
>      * @param value  The value.
>      * @throws IOException When writing fails.
>      * @see
> org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object,
> java.lang.Object)
>      */
>     @Override
>     *public void write(KEY key, Put value)*
> *    throws IOException {*
> *      if (value instanceof Put) this.table.put(new Put((Put)value));*
> *//      else if (value instanceof Delete) this.table.delete(new
> Delete((Delete)value));*
> *      else throw new IOException("Pass a Delete or a Put");*
> *    }*
>   }
>
>   /**
>    * Creates a new record writer.
>    *
>    * @param context  The current task context.
>    * @return The newly created writer instance.
>    * @throws IOException When creating the writer fails.
>    * @throws InterruptedException When the jobs is cancelled.
>    * @see
> org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext)
>    */
>   @Override
>   public RecordWriter<KEY, *Put*> getRecordWriter(
>     TaskAttemptContext context)
>   throws IOException, InterruptedException {
>     return new TableRecordWriter<KEY>(this.table);
>   }
>
>   /**
>    * Checks if the output target exists.
>    *
>    * @param context  The current context.
>    * @throws IOException When the check fails.
>    * @throws InterruptedException When the job is aborted.
>    * @see
> org.apache.hadoop.mapreduce.OutputFormat#checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext)
>    */
>   @Override
>   public void checkOutputSpecs(JobContext context) throws IOException,
>       InterruptedException {
>     // TODO Check if the table exists?
>
>   }
>
>   /**
>    * Returns the output committer.
>    *
>    * @param context  The current context.
>    * @return The committer.
>    * @throws IOException When creating the committer fails.
>    * @throws InterruptedException When the job is aborted.
>    * @see
> org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter(org.apache.hadoop.mapreduce.TaskAttemptContext)
>    */
>   @Override
>   public OutputCommitter getOutputCommitter(TaskAttemptContext context)
>   throws IOException, InterruptedException {
>     return new TableOutputCommitter();
>   }
>
>   public Configuration getConf() {
>     return conf;
>   }
>
>   @Override
>   public void setConf(Configuration otherConf) {
>     this.conf = HBaseConfiguration.create(otherConf);
>
>     String tableName = this.conf.get(OUTPUT_TABLE);
>     if(tableName == null || tableName.length() <= 0) {
>       throw new IllegalArgumentException("Must specify table name");
>     }
>
>     String address = this.conf.get(QUORUM_ADDRESS);
>     int zkClientPort = this.conf.getInt(QUORUM_PORT, 0);
>     String serverClass = this.conf.get(REGION_SERVER_CLASS);
>     String serverImpl = this.conf.get(REGION_SERVER_IMPL);
>
>     try {
>       if (address != null) {
>         ZKUtil.applyClusterKeyToConf(this.conf, address);
>       }
>       if (serverClass != null) {
>         this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl);
>       }
>       if (zkClientPort != 0) {
>         this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
>       }
>       this.table = new HTable(this.conf, tableName);
>       this.table.setAutoFlush(false, true);
>     *  String outDir = FSUtils.getTableDir(FSUtils.getRootDir(conf),
> this.table.getName()).toString();*
> *      this.conf.set("mapred.output.dir", outDir);*
> *      otherConf.set("mapred.output.dir", outDir);*
>       LOG.info("Created table instance for "  + tableName);
>     } catch(IOException e) {
>       LOG.error(e);
>       throw new RuntimeException(e);
>     }
>   }
> }
>
>