You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by "Jeff Markham (JIRA)" <ji...@apache.org> on 2013/01/01 20:50:13 UTC
[jira] [Updated] (PIG-2251) PIG leaks Zookeeper connections when
using HBaseStorage
[ https://issues.apache.org/jira/browse/PIG-2251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jeff Markham updated PIG-2251:
------------------------------
Attachment: PIG-2251.patch
HTable cleanup to prevent ZooKeeper connection leaks.
> PIG leaks Zookeeper connections when using HBaseStorage
> -------------------------------------------------------
>
> Key: PIG-2251
> URL: https://issues.apache.org/jira/browse/PIG-2251
> Project: Pig
> Issue Type: Bug
> Affects Versions: 0.8.1, 0.9.0
> Environment: PIG 0.9 branch
> HBase 0.90.3
> HDFS 0.20-append
> Reporter: Vincent BARAT
> Attachments: PIG-2251.patch
>
>
> I run a set of PIG jobs from a Java process (using PigServer). Most of which use HBaseStorage to load data from HBase.
> Each job is run using a new PigServer object, and I correctly call PigServer.shutdown() when my pig server is no longer used.
> Nevertheless, after a few hours of run, I notice that the number of connections to my Zookeeper servers reach the limit (300 in my case).
> It appears that each job leaks 4 or 5 Zookeeper connections.
> It was not the case with PIG 0.6.1 + HBase 0.20.6
> To solve this issue (temporarily) by killing the process running PIG after a few set of jobs have been run : connections are correctly closed.
> My process don't use HBase by itself, only HBaseStorage, so I guess the leak is in the code of HBaseStorage: maybe to cnx to HBase are not closed.
> All my request are simple request loading data from HBase, lik:
> {code}
> pigServer.registerQuery("start_sessions = LOAD '"
> + Analytics.getHBaseTableURL("startSession")
> + "' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('meta:sid meta:infoid meta:imei meta:timestamp') "
> + "AS (sid:chararray, infoid:chararray, imei:chararray, start:long);");
> pigServer.registerQuery("end_sessions = LOAD '"
> + Analytics.getHBaseTableURL("endSession")
> + "' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('meta:sid meta:timestamp meta:locid') "
> + "AS (sid:chararray, end:long, locid:chararray);");
> pigServer.registerQuery("sessions = JOIN start_sessions BY sid, end_sessions BY sid;");
> pigServer.store("sessions", Analytics.getOutputFilePath("sessions"), "BinStorage");
> {code}
> Code used to allocate a new PIG server:
> {code}
> public static PigServer getNewPigServer() throws IOException
> {
> /* Get system properties */
> Properties properties = new Properties();
> /* Set specific Hadoop properties for PIG jobs */
> properties.setProperty("mapred.child.java.opts", "-Xmx" + childMemory + "m");
> /* Create PIG context */
> PigContext context = new PigContext(local ? ExecType.LOCAL : ExecType.MAPREDUCE, properties);
> /* Create the PIG server */
> PigServer pigServer = new PigServer(context);
> /* Register our User Defined Functions (UDFs) */
> pigServer.registerJar(pigUdfsPath);
> /* Register shortcuts for our UDFs */
> pigServer.registerFunction("GetActivitiesLengthsRanges", new FuncSpec(
> "com.ubikod.ermin.analytics.pigudf.GetActivitiesLengthsRanges"));
> pigServer.registerFunction("GetActivitiesLinks", new FuncSpec(
> "com.ubikod.ermin.analytics.pigudf.GetActivitiesLinks"));
> pigServer.registerFunction("GetActivitiesPeriodsAndLengths", new FuncSpec(
> "com.ubikod.ermin.analytics.pigudf.GetActivitiesPeriodsAndLengths"));
> pigServer.registerFunction("GetCountRange", new FuncSpec(
> "com.ubikod.ermin.analytics.pigudf.GetCountRange"));
> pigServer.registerFunction("GetAllPeriods", new FuncSpec(
> "com.ubikod.ermin.analytics.pigudf.GetAllPeriods"));
> pigServer.registerFunction("GetCountRangeLabel", new FuncSpec(
> "com.ubikod.ermin.analytics.pigudf.GetCountRangeLabel"));
> pigServer.registerFunction("GetCountsAndLengthsByName", new FuncSpec(
> "com.ubikod.ermin.analytics.pigudf.GetCountsAndLengthsByName"));
> pigServer.registerFunction("GetCountsByName", new FuncSpec(
> "com.ubikod.ermin.analytics.pigudf.GetCountsByName"));
> pigServer.registerFunction("GetDayPeriod", new FuncSpec(
> "com.ubikod.ermin.analytics.pigudf.GetDayPeriod"));
> pigServer.registerFunction("GetDayWeekMonthPeriods", new FuncSpec(
> "com.ubikod.ermin.analytics.pigudf.GetDayWeekMonthPeriods"));
> pigServer.registerFunction("GetLengthRange", new FuncSpec(
> "com.ubikod.ermin.analytics.pigudf.GetLengthRange"));
> pigServer.registerFunction("GetLengthRangeLabel", new FuncSpec(
> "com.ubikod.ermin.analytics.pigudf.GetLengthRangeLabel"));
> pigServer.registerFunction("GetPeriods", new FuncSpec(
> "com.ubikod.ermin.analytics.pigudf.GetPeriods"));
> pigServer.registerFunction("GetPeriodsAndLengths", new FuncSpec(
> "com.ubikod.ermin.analytics.pigudf.GetPeriodsAndLengths"));
> pigServer.registerFunction("NormalizeCarrierName", new FuncSpec(
> "com.ubikod.ermin.analytics.pigudf.NormalizeCarrierName"));
> pigServer.registerFunction("NormalizeCountryCode", new FuncSpec(
> "com.ubikod.ermin.analytics.pigudf.NormalizeCountryCode"));
> pigServer.registerFunction("NormalizeLocaleCode", new FuncSpec(
> "com.ubikod.ermin.analytics.pigudf.NormalizeLocaleCode"));
> pigServer.registerFunction("NormalizeNetworkType", new FuncSpec(
> "com.ubikod.ermin.analytics.pigudf.NormalizeNetworkType"));
> pigServer.registerFunction("NormalizeNetworkSubType", new FuncSpec(
> "com.ubikod.ermin.analytics.pigudf.NormalizeNetworkSubType"));
> pigServer.registerFunction("NormalizePhoneManufacturer", new FuncSpec(
> "com.ubikod.ermin.analytics.pigudf.NormalizePhoneManufacturer"));
> pigServer.registerFunction("NormalizePhoneModel", new FuncSpec(
> "com.ubikod.ermin.analytics.pigudf.NormalizePhoneModel"));
> pigServer.registerFunction("NormalizeString", new FuncSpec(
> "com.ubikod.ermin.analytics.pigudf.NormalizeString"));
> pigServer.registerFunction("SubString", new FuncSpec(
> "com.ubikod.ermin.analytics.pigudf.SubString"));
> pigServer.registerFunction("GuessCountryCode", new FuncSpec(
> "com.ubikod.ermin.analytics.pigudf.GuessCountryCode"));
> /* Return this new instance of PIG server */
> return pigServer;
> }
> {code}
> Code used when PIG server no longer used:
> {code}
> pigServer.shutdown();
> {code}
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira