You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@phoenix.apache.org by "Thangamani, Arun" <th...@cobalt.com> on 2015/12/07 08:45:12 UTC

system.catalog and system.stats entries slowing down bulk MR insert by 20-25X

Hello, I noticed an issue with bulk insert through map reduce in phoenix 4.4.0.2.3.0.0-2557, using outline of the code below

Normally the inserts of about 25 million rows complete in about 5 mins, I have 5 region servers and the phoenix table has 32 buckets
But sometimes (maybe after major compactions or region movement?), writes simply slow down to 90 mins, when I truncate SYSTEM.STATS hbase table, the inserts get a little faster (60 mins), but when I truncate both SYSTEM.CATALOG & SYSTEM.STATS tables, and recreate the phoenix table def(s) the inserts go back to 5 mins, the workaround of truncating SYSTEM tables is not sustainable for long, can someone help and let me know if there is a patch available for this? Thanks in advance for the help.

Job job = Job.getInstance(conf, NAME);
// Set the target Phoenix table and the columns
PhoenixMapReduceUtil.setOutput(job, tableName, "WEB_ID,WEB_PAGE_LABEL,DEVICE_TYPE," +

        "WIDGET_INSTANCE_ID,WIDGET_TYPE,WIDGET_VERSION,WIDGET_CONTEXT," +
        "TOTAL_CLICKS,TOTAL_CLICK_VIEWS,TOTAL_HOVER_TIME_MS,TOTAL_TIME_ON_PAGE_MS,TOTAL_VIEWABLE_TIME_MS," +
        "VIEW_COUNT,USER_SEGMENT,DIM_DATE_KEY,VIEW_DATE,VIEW_DATE_TIMESTAMP,ROW_NUMBER");
FileInputFormat.setInputPaths(job, inputPath);
job.setMapperClass(WidgetPhoenixMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(WidgetPagesStatsWritable.class);
job.setOutputFormatClass(PhoenixOutputFormat.class);
TableMapReduceUtil.addDependencyJars(job);
job.setNumReduceTasks(0);
job.waitForCompletion(true);

public static class WidgetPhoenixMapper extends Mapper<LongWritable, Text, NullWritable, WidgetPagesStatsWritable> {
    @Override
    public void map(LongWritable longWritable, Text text, Context context) throws IOException, InterruptedException {
        Configuration conf = context.getConfiguration();
        String rundateString = conf.get("rundate");
        PagesSegmentWidgetLineParser parser = new PagesSegmentWidgetLineParser();
        try {
            PagesSegmentWidget pagesSegmentWidget = parser.parse(text.toString());

            if (pagesSegmentWidget != null) {
                WidgetPagesStatsWritable widgetPagesStatsWritable = new WidgetPagesStatsWritable();
                WidgetPagesStats widgetPagesStats = new WidgetPagesStats();

                widgetPagesStats.setWebId(pagesSegmentWidget.getWebId());
                widgetPagesStats.setWebPageLabel(pagesSegmentWidget.getWebPageLabel());
                widgetPagesStats.setWidgetInstanceId(pagesSegmentWidget.getWidgetInstanceId());
                …..

                widgetPagesStatsWritable.setWidgetPagesStats(widgetPagesStats);
                context.write(NullWritable.get(), widgetPagesStatsWritable);
            }

        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

public final class WidgetPagesStats {
    private String webId;
    private String webPageLabel;
    private long widgetInstanceId;
    private String widgetType;

        …
    @Override
    public boolean equals(Object o) {

        ..
    }
    @Override
    public int hashCode() {

        ..
    }
    @Override
    public String toString() {
        return "WidgetPhoenix{“….
                '}';
    }
}

public class WidgetPagesStatsWritable implements DBWritable, Writable {

    private WidgetPagesStats widgetPagesStats;

    public void readFields(DataInput input) throws IOException {
        widgetPagesStats.setWebId(input.readLine());
        widgetPagesStats.setWebPageLabel(input.readLine());
        widgetPagesStats.setWidgetInstanceId(input.readLong());
        widgetPagesStats.setWidgetType(input.readLine());

        …
    }

    public void write(DataOutput output) throws IOException {
        output.writeBytes(widgetPagesStats.getWebId());
        output.writeBytes(widgetPagesStats.getWebPageLabel());

        output.writeLong(widgetPagesStats.getWidgetInstanceId());
        output.writeBytes(widgetPagesStats.getWidgetType());

        ..
    }

    public void readFields(ResultSet rs) throws SQLException {
        widgetPagesStats.setWebId(rs.getString("WEB_ID"));
        widgetPagesStats.setWebPageLabel(rs.getString("WEB_PAGE_LABEL"));
        widgetPagesStats.setWidgetInstanceId(rs.getLong("WIDGET_INSTANCE_ID"));
        widgetPagesStats.setWidgetType(rs.getString("WIDGET_TYPE"));

        …
    }

    public void write(PreparedStatement pstmt) throws SQLException {
        Connection connection = pstmt.getConnection();
        PhoenixConnection phoenixConnection = (PhoenixConnection) connection;
        //connection.getClientInfo().setProperty("scn", Long.toString(widgetPhoenix.getViewDateTimestamp()));

        pstmt.setString(1, widgetPagesStats.getWebId());
        pstmt.setString(2, widgetPagesStats.getWebPageLabel());
        pstmt.setString(3, widgetPagesStats.getDeviceType());

        pstmt.setLong(4, widgetPagesStats.getWidgetInstanceId());

        …
    }

    public WidgetPagesStats getWidgetPagesStats() {
        return widgetPagesStats;
    }

    public void setWidgetPagesStats(WidgetPagesStats widgetPagesStats) {
        this.widgetPagesStats = widgetPagesStats;
    }
}


----------------------------------------------------------------------
This message and any attachments are intended only for the use of the addressee and may contain information that is privileged and confidential. If the reader of the message is not the intended recipient or an authorized representative of the intended recipient, you are hereby notified that any dissemination of this communication is strictly prohibited. If you have received this communication in error, notify the sender immediately by return email and delete the message and any attachments from your system.

Re: system.catalog and system.stats entries slowing down bulk MR insert by 20-25X

Posted by Nick Dimiduk <nd...@apache.org>.
Hi Arun,

Did you ever find a solution to this one? Probably a good place to start is
the logs from your MR job while the slowdown is happening.

-n

On Sun, Dec 6, 2015 at 11:45 PM, Thangamani, Arun <th...@cobalt.com>
wrote:

> Hello, I noticed an issue with bulk insert through map reduce in phoenix
> 4.4.0.2.3.0.0-2557, using outline of the code below
>
> Normally the inserts of about 25 million rows complete in about 5 mins, I
> have 5 region servers and the phoenix table has 32 buckets
> But sometimes (maybe after major compactions or region movement?), writes
> simply slow down to 90 mins, when I truncate SYSTEM.STATS hbase table, the
> inserts get a little faster (60 mins), but when I truncate both
> SYSTEM.CATALOG & SYSTEM.STATS tables, and recreate the phoenix table def(s)
> the inserts go back to 5 mins, the workaround of truncating SYSTEM tables
> is not sustainable for long, can someone help and let me know if there is a
> patch available for this? Thanks in advance for the help.
>
> Job job = Job.getInstance(conf, NAME);
> // Set the target Phoenix table and the columns
> PhoenixMapReduceUtil.setOutput(job, tableName,
> "WEB_ID,WEB_PAGE_LABEL,DEVICE_TYPE," +
>
>         "WIDGET_INSTANCE_ID,WIDGET_TYPE,WIDGET_VERSION,WIDGET_CONTEXT," +
>
> "TOTAL_CLICKS,TOTAL_CLICK_VIEWS,TOTAL_HOVER_TIME_MS,TOTAL_TIME_ON_PAGE_MS,TOTAL_VIEWABLE_TIME_MS,"
> +
>
> "VIEW_COUNT,USER_SEGMENT,DIM_DATE_KEY,VIEW_DATE,VIEW_DATE_TIMESTAMP,ROW_NUMBER");
> FileInputFormat.setInputPaths(job, inputPath);
> job.setMapperClass(WidgetPhoenixMapper.class);
> job.setMapOutputKeyClass(NullWritable.class);
> job.setMapOutputValueClass(WidgetPagesStatsWritable.class);
> job.setOutputFormatClass(PhoenixOutputFormat.class);
> TableMapReduceUtil.addDependencyJars(job);
> job.setNumReduceTasks(0);
> job.waitForCompletion(true);
>
> public static class WidgetPhoenixMapper extends Mapper<LongWritable, Text,
> NullWritable, WidgetPagesStatsWritable> {
>     @Override
>     public void map(LongWritable longWritable, Text text, Context context)
> throws IOException, InterruptedException {
>         Configuration conf = context.getConfiguration();
>         String rundateString = conf.get("rundate");
>         PagesSegmentWidgetLineParser parser = new
> PagesSegmentWidgetLineParser();
>         try {
>             PagesSegmentWidget pagesSegmentWidget =
> parser.parse(text.toString());
>
>             if (pagesSegmentWidget != null) {
>                 WidgetPagesStatsWritable widgetPagesStatsWritable = new
> WidgetPagesStatsWritable();
>                 WidgetPagesStats widgetPagesStats = new WidgetPagesStats();
>
>                 widgetPagesStats.setWebId(pagesSegmentWidget.getWebId());
>
> widgetPagesStats.setWebPageLabel(pagesSegmentWidget.getWebPageLabel());
>
> widgetPagesStats.setWidgetInstanceId(pagesSegmentWidget.getWidgetInstanceId());
>                 …..
>
>
> widgetPagesStatsWritable.setWidgetPagesStats(widgetPagesStats);
>                 context.write(NullWritable.get(),
> widgetPagesStatsWritable);
>             }
>
>         }catch (Exception e){
>             e.printStackTrace();
>         }
>     }
> }
>
> public final class WidgetPagesStats {
>     private String webId;
>     private String webPageLabel;
>     private long widgetInstanceId;
>     private String widgetType;
>
>         …
>     @Override
>     public boolean equals(Object o) {
>
>         ..
>     }
>     @Override
>     public int hashCode() {
>
>         ..
>     }
>     @Override
>     public String toString() {
>         return "WidgetPhoenix{“….
>                 '}';
>     }
> }
>
> public class WidgetPagesStatsWritable implements DBWritable, Writable {
>
>     private WidgetPagesStats widgetPagesStats;
>
>     public void readFields(DataInput input) throws IOException {
>         widgetPagesStats.setWebId(input.readLine());
>         widgetPagesStats.setWebPageLabel(input.readLine());
>         widgetPagesStats.setWidgetInstanceId(input.readLong());
>         widgetPagesStats.setWidgetType(input.readLine());
>
>         …
>     }
>
>     public void write(DataOutput output) throws IOException {
>         output.writeBytes(widgetPagesStats.getWebId());
>         output.writeBytes(widgetPagesStats.getWebPageLabel());
>
>         output.writeLong(widgetPagesStats.getWidgetInstanceId());
>         output.writeBytes(widgetPagesStats.getWidgetType());
>
>         ..
>     }
>
>     public void readFields(ResultSet rs) throws SQLException {
>         widgetPagesStats.setWebId(rs.getString("WEB_ID"));
>         widgetPagesStats.setWebPageLabel(rs.getString("WEB_PAGE_LABEL"));
>
> widgetPagesStats.setWidgetInstanceId(rs.getLong("WIDGET_INSTANCE_ID"));
>         widgetPagesStats.setWidgetType(rs.getString("WIDGET_TYPE"));
>
>         …
>     }
>
>     public void write(PreparedStatement pstmt) throws SQLException {
>         Connection connection = pstmt.getConnection();
>         PhoenixConnection phoenixConnection = (PhoenixConnection)
> connection;
>         //connection.getClientInfo().setProperty("scn",
> Long.toString(widgetPhoenix.getViewDateTimestamp()));
>
>         pstmt.setString(1, widgetPagesStats.getWebId());
>         pstmt.setString(2, widgetPagesStats.getWebPageLabel());
>         pstmt.setString(3, widgetPagesStats.getDeviceType());
>
>         pstmt.setLong(4, widgetPagesStats.getWidgetInstanceId());
>
>         …
>     }
>
>     public WidgetPagesStats getWidgetPagesStats() {
>         return widgetPagesStats;
>     }
>
>     public void setWidgetPagesStats(WidgetPagesStats widgetPagesStats) {
>         this.widgetPagesStats = widgetPagesStats;
>     }
> }
>
>
> ----------------------------------------------------------------------
> This message and any attachments are intended only for the use of the
> addressee and may contain information that is privileged and confidential.
> If the reader of the message is not the intended recipient or an authorized
> representative of the intended recipient, you are hereby notified that any
> dissemination of this communication is strictly prohibited. If you have
> received this communication in error, notify the sender immediately by
> return email and delete the message and any attachments from your system.
>