You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by wl...@apache.org on 2016/01/19 04:02:33 UTC
incubator-hawq git commit: HAWQ-274. Add check for segments'
temporary directories 1. add two columns in gp_segment_config,
failed_tmpdir_num and failed_tmpdir;
2. segment's temporary directory information is loaded in shared memeory;
3. segment's
Repository: incubator-hawq
Updated Branches:
refs/heads/master 0019b5270 -> 7df00d587
HAWQ-274. Add check for segments' temporary directories
1. add two columns in gp_segment_config, failed_tmpdir_num and failed_tmpdir;
2. segment's temporary directory information is loaded in shared memeory;
3. segment's RM process checks and reports failed tmp dir information in IMAlive message;
4. master's RM process updates segment's status in catalog table, if a segment has failed tmp dir,
this segment is considered as down.
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/7df00d58
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/7df00d58
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/7df00d58
Branch: refs/heads/master
Commit: 7df00d58727b044b48b9c06c9e87ad4e937eb293
Parents: 0019b52
Author: Wen Lin <wl...@pivotal.io>
Authored: Tue Jan 19 10:58:39 2016 +0800
Committer: Wen Lin <wl...@pivotal.io>
Committed: Tue Jan 19 10:58:39 2016 +0800
----------------------------------------------------------------------
src/backend/cdb/cdbtmpdir.c | 331 ++++++++++++----
src/backend/postmaster/identity.c | 4 +-
.../communication/rmcomm_RMSEG2RM.c | 13 +-
src/backend/resourcemanager/include/dynrm.h | 2 +
.../resourcemanager/include/resourcepool.h | 25 +-
src/backend/resourcemanager/requesthandler.c | 44 ++-
.../resourcemanager/requesthandler_RMSEG.c | 73 +++-
src/backend/resourcemanager/resourcemanager.c | 70 ++--
.../resourcemanager/resourcemanager_RMSEG.c | 10 +-
src/backend/resourcemanager/resourcepool.c | 374 ++++++++++++++++---
src/backend/storage/ipc/ipci.c | 4 +-
src/backend/utils/gp/segadmin.c | 2 +
src/backend/utils/init/postinit.c | 4 +-
src/include/catalog/gp_segment_config.h | 13 +-
src/include/cdb/cdbtmpdir.h | 23 +-
src/include/storage/lwlock.h | 3 +-
tools/bin/gppylib/data/2.0.json | 16 +-
17 files changed, 823 insertions(+), 188 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7df00d58/src/backend/cdb/cdbtmpdir.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/cdbtmpdir.c b/src/backend/cdb/cdbtmpdir.c
index 34924db..40db883 100644
--- a/src/backend/cdb/cdbtmpdir.c
+++ b/src/backend/cdb/cdbtmpdir.c
@@ -17,113 +17,288 @@
* under the License.
*/
-#include "cdb/cdbtmpdir.h"
#include "postgres.h"
-
-#include <fcntl.h>
-#include <unistd.h>
-
-#include "access/heapam.h"
-#include "access/xact.h"
-#include "catalog/catalog.h"
-#include "catalog/namespace.h"
-#include "catalog/pg_authid.h"
-#include "catalog/pg_database.h"
-#include "catalog/pg_tablespace.h"
-#include "libpq/hba.h"
-#include "libpq/libpq-be.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbutil.h"
-#include "mb/pg_wchar.h"
+#include "cdb/cdbtmpdir.h"
+#include "cdb/cdbvars.h"
#include "miscadmin.h"
-#include "pgstat.h"
#include "postmaster/autovacuum.h"
-#include "postmaster/postmaster.h"
-#include "storage/backendid.h"
-#include "storage/fd.h"
#include "storage/ipc.h"
-#include "storage/proc.h"
-#include "storage/procarray.h"
-#include "storage/procsignal.h"
-#include "storage/sinvaladt.h"
-#include "storage/smgr.h"
-#include "utils/acl.h"
-#include "utils/flatfiles.h"
-#include "utils/guc.h"
-#include "utils/relcache.h"
-#include "utils/resscheduler.h"
-#include "utils/syscache.h"
-#include "utils/tqual.h" /* SharedSnapshot */
-#include "utils/portal.h"
-#include "pgstat.h"
-
-static List *initTmpDirList(List *list, char *tmpdir_config);
-static void destroyTmpDirList(List *list);
-
-List *initTmpDirList(List *list, char *tmpdir_string)
+#include "storage/shmem.h"
+#include <sys/stat.h>
+
+TmpDirInfo* TmpDirInfoArray = NULL;
+
+static List *tmpDirList = NULL;
+
+int32_t TmpDirNum = 0;
+
+Size TmpDirInfoArraySize(void);
+
+void TmpDirInfoArray_ShmemInit(void);
+
+char* GetTmpDirPathFromArray(int64_t idx);
+
+bool DestroyTmpDirInfoArray(TmpDirInfo *info);
+
+bool CheckTmpDirAvailable(char *path);
+
+void destroyTmpDirList(List *list)
+{
+ ListCell *lc = NULL;
+
+ foreach(lc, list)
+ {
+ char *tmpdir = (char *)lfirst(lc);
+ pfree(tmpdir);
+ }
+ list_free(list);
+}
+
+static bool CheckDirValid(char* path)
{
- int idx = -1;
- int i = 0;
- char *tmpdir;
-
- for (i=0;i<strlen(tmpdir_string);i++)
+ struct stat info;
+ if (path == NULL || stat(path, &info) < 0)
{
- if (tmpdir_string[i] == ',')
+ return false;
+ }
+ else
+ {
+ if (!S_ISDIR(info.st_mode))
+ return false;
+ else
+ return true;
+ }
+}
+
+static int GetTmpDirNumber(char* szTmpDir)
+{
+ int i = 0, idx = -1;
+ char *tmpdir = NULL;
+ int tmpDirNum = 0;
+ tmpDirList = NULL;
+
+ for (i = 0; i <= strlen(szTmpDir); i++)
+ {
+ if (szTmpDir[i] == ',' || i == strlen(szTmpDir))
{
- tmpdir = (char *)palloc0(i-idx);
- memcpy(tmpdir, tmpdir_string+idx+1, i-idx-1);
- list = lappend(list, tmpdir);
+ /* in case two commas are written together */
+ if (i-idx > 1)
+ {
+ tmpdir = (char *)palloc0(i-idx);
+ strncpy(tmpdir, szTmpDir+idx+1, i-idx-1);
+ if(CheckDirValid(tmpdir))
+ {
+ tmpDirNum++;
+ elog(LOG, "Get a temporary directory:%s", tmpdir);
+ tmpDirList = lappend(tmpDirList, tmpdir);
+ }
+ else
+ {
+ pfree(tmpdir);
+ }
+ }
idx = i;
}
}
- tmpdir = (char *)palloc0(i-idx);
- memcpy(tmpdir, tmpdir_string+idx+1, i-idx-1);
- list = lappend(list, tmpdir);
- return list;
+ elog(LOG, "Get %d temporary directories", tmpDirNum);
+ return tmpDirNum;
}
-void destroyTmpDirList(List *list)
+/*
+ * Calculate the size of share memory for temporary directory information
+ */
+Size TmpDirInfoArrayShmemSize(void)
{
- ListCell *lc = NULL;
-
- foreach(lc, list)
+
+ if (AmIMaster())
{
- char *tmpdir = (char *)lfirst(lc);
- pfree(tmpdir);
+ TmpDirNum = GetTmpDirNumber(rm_master_tmp_dirs);
}
- list_free(list);
+ else if (AmISegment())
+ {
+ TmpDirNum = GetTmpDirNumber(rm_seg_tmp_dirs);
+ }
+ else
+ {
+ elog(LOG, "Don't need create share memory for temporary directory information");
+ TmpDirNum = 0;
+ }
+
+ return MAXALIGN(TmpDirNum*sizeof(TmpDirInfo));
}
-void getLocalTmpDirFromMasterConfig(int session_id)
+/*
+ * Initialize share memory for temporary directory information
+ */
+void TmpDirInfoArrayShmemInit(void)
{
- List *tmpdirs = NULL;
-
- tmpdirs = initTmpDirList(tmpdirs, rm_master_tmp_dirs);
-
- LocalTempPath = pstrdup((char *)lfirst(list_nth_cell(tmpdirs, gp_session_id % list_length(tmpdirs))));
+ bool found = false;
- destroyTmpDirList(tmpdirs);
+ if (TmpDirNum == 0)
+ return;
+
+ TmpDirInfoArray = (TmpDirInfo *)ShmemInitStruct("Temporary Directory Information Cache",
+ TmpDirNum*sizeof(TmpDirInfo), &found);
+ if(!TmpDirInfoArray)
+ {
+ elog(FATAL,
+ "Could not initialize Temporary Directory Information shared memory");
+ }
+
+ if(!found)
+ {
+ ListCell *lc = NULL;
+ int32_t i = 0;
+ MemSet(TmpDirInfoArray, 0, TmpDirNum*sizeof(TmpDirInfo));
+ foreach(lc, tmpDirList) {
+ TmpDirInfoArray[i].available = true;
+ strncpy(TmpDirInfoArray[i].path, (char*)lfirst(lc), strlen((char*)lfirst(lc)));
+ i++;
+ }
+
+ if (tmpDirList)
+ {
+ destroyTmpDirList(tmpDirList);
+ }
+ }
+ elog(LOG, "Initialize share memeory for temporary directory info finish.");
+}
+
+/*
+ * Check if this temporary directory is OK to read or write.
+ * If not, it's probably due to disk error.
+ */
+bool CheckTmpDirAvailable(char *path)
+{
+ FILE *tmp = NULL;
+ bool ret = true;
+ char* fname = NULL;
+ char* testfile = "/checktmpdir.log";
+
+ /* write some bytes to a file to check if
+ * this temporary directory is OK.
+ */
+ fname = palloc0(strlen(path) + strlen(testfile) + 1);
+ strncpy(fname, path, strlen(path));
+ strncpy(fname + strlen(path), testfile, strlen(testfile));
+ tmp = fopen(fname, "w");
+ if (tmp == NULL)
+ {
+ elog(LOG, "Can't open file:%s when check temporary directory", fname);
+ ret = false;
+ goto _exit;
+ }
+
+ if (fseek(tmp, 0, SEEK_SET) != 0)
+ {
+ elog(LOG, "Can't seek file:%s when check temporary directory", fname);
+ ret = false;
+ goto _exit;
+ }
+
+ if (strlen("test") != fwrite("test", 1, strlen("test"), tmp))
+ {
+ elog(LOG, "Can't write file:%s when check temporary directory", fname);
+ ret = false;
+ goto _exit;
+ }
+
+_exit:
+ if (fname != NULL)
+ pfree(fname);
+ if (tmp != NULL)
+ fclose(tmp);
+ return ret;
+}
+
+/*
+ * Check the status of each temporary directory kept in
+ * shared memory, set to false if it is not available.
+ */
+void checkTmpDirStatus(void)
+{
+ LWLockAcquire(TmpDirInfoLock, LW_SHARED);
+
+ for (int i = 0; i < TmpDirNum; i++)
+ {
+ bool oldStatus = TmpDirInfoArray[i].available;
+ bool newStatus = CheckTmpDirAvailable(TmpDirInfoArray[i].path);
+ if (oldStatus != newStatus)
+ {
+ LWLockRelease(TmpDirInfoLock);
+ LWLockAcquire(TmpDirInfoLock, LW_EXCLUSIVE);
+ TmpDirInfoArray[i].available = newStatus;
+ LWLockRelease(TmpDirInfoLock);
+ LWLockAcquire(TmpDirInfoLock, LW_SHARED);
+ }
+ }
+
+ LWLockRelease(TmpDirInfoLock);
+ elog(LOG, "checkTmpDirStatus finish!");
}
-void getLocalTmpDirFromSegmentConfig(int session_id, int command_id, int qeidx)
+/*
+ * Get a list of failed temporary directory
+ */
+List* getFailedTmpDirList(void)
{
- List *tmpdirs = NULL;
+ List *failedList = NULL;
+ char *failedDir = NULL;
+
+ LWLockAcquire(TmpDirInfoLock, LW_SHARED);
+ for (int i = 0; i < TmpDirNum; i++)
+ {
+ if (!TmpDirInfoArray[i].available)
+ {
+ failedDir = pstrdup(TmpDirInfoArray[i].path);
+ failedList = lappend(failedList, failedDir);
+ }
+ }
+ LWLockRelease(TmpDirInfoLock);
+ return failedList;
+}
- if (qeidx == -1)
+/*
+ * Get a temporary directory path from array by its index
+ */
+char* GetTmpDirPathFromArray(int64_t idx)
+{
+ Insist(idx >=0 && idx <= TmpDirNum-1);
+
+ LWLockAcquire(TmpDirInfoLock, LW_SHARED);
+
+ if (TmpDirInfoArray[idx].available)
+ {
+ LWLockRelease(TmpDirInfoLock);
+ return TmpDirInfoArray[idx].path;
+ }
+ else
+ {
+ LWLockRelease(TmpDirInfoLock);
+ ereport(FATAL,
+ (errcode(ERRCODE_CDB_INTERNAL_ERROR),
+ errmsg("Temporary directory:%s is failed", TmpDirInfoArray[idx].path)));
+ }
+ return NULL;
+}
+
+void getMasterLocalTmpDirFromShmem(int session_id)
+{
+ LocalTempPath = GetTmpDirPathFromArray(session_id % TmpDirNum);
+}
+
+void getSegmentLocalTmpDirFromShmem(int session_id, int command_id, int qeidx)
+{
+ if(qeidx == -1)
{
- // QE on master
- getLocalTmpDirFromMasterConfig(session_id);
+ getMasterLocalTmpDirFromShmem(session_id);
}
else
{
- getLocalTmpDirFromMasterConfig(session_id);
-
- // QE on segment
- tmpdirs = initTmpDirList(tmpdirs, rm_seg_tmp_dirs);
- int64_t session_key = session_id;
- int64_t key = (session_key << 32) + command_id + qeidx;
- LocalTempPath = pstrdup((char *)lfirst(list_nth_cell(tmpdirs, key % list_length(tmpdirs))));
- destroyTmpDirList(tmpdirs);
+ int64_t key = (session_id << 32) + command_id + qeidx;
+ LocalTempPath = GetTmpDirPathFromArray(key % TmpDirNum);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7df00d58/src/backend/postmaster/identity.c
----------------------------------------------------------------------
diff --git a/src/backend/postmaster/identity.c b/src/backend/postmaster/identity.c
index 7851fa8..898a18a 100644
--- a/src/backend/postmaster/identity.c
+++ b/src/backend/postmaster/identity.c
@@ -414,8 +414,8 @@ SetupProcessIdentity(const char *str)
}
else
{
- getLocalTmpDirFromSegmentConfig(gp_session_id, gp_command_count, GetQEIndex());
- elog(DEBUG1, "getLocalTmpDirFromSegmentConfig session_id:%d command_id:%d qeidx:%d tmpdir:%s", gp_session_id, gp_command_count, GetQEIndex(), LocalTempPath);
+ getSegmentLocalTmpDirFromShmem(gp_session_id, gp_command_count, GetQEIndex());
+ elog(DEBUG1, "getSegmentLocalTmpDirFromShmem session_id:%d command_id:%d qeidx:%d tmpdir:%s", gp_session_id, gp_command_count, GetQEIndex(), LocalTempPath);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7df00d58/src/backend/resourcemanager/communication/rmcomm_RMSEG2RM.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/communication/rmcomm_RMSEG2RM.c b/src/backend/resourcemanager/communication/rmcomm_RMSEG2RM.c
index 976c896..aa7ed37 100644
--- a/src/backend/resourcemanager/communication/rmcomm_RMSEG2RM.c
+++ b/src/backend/resourcemanager/communication/rmcomm_RMSEG2RM.c
@@ -22,7 +22,7 @@
#include "communication/rmcomm_MessageHandler.h"
#include "communication/rmcomm_RMSEG_RM_Protocol.h"
#include "dynrm.h"
-
+#include "cdb/cdbtmpdir.h"
#include "utils/memutilities.h"
#include "utils/simplestring.h"
#include "utils/linkedlist.h"
@@ -107,9 +107,6 @@ int sendIMAlive(int *errorcode,
int errorbufsize)
{
int res = FUNC_RETURN_OK;
-
- uint16_t dummyTempDirCount = 0;
- uint16_t dummyTempDirBrokenCount = 0;
AsyncCommBuffer newcommbuffer = NULL;
Assert( DRMGlobalInstance->LocalHostStat != NULL );
@@ -119,8 +116,8 @@ int sendIMAlive(int *errorcode,
initializeSelfMaintainBuffer(&tosend, PCONTEXT);
RPCRequestHeadIMAliveData requesthead;
- requesthead.TmpDirCount = dummyTempDirCount;
- requesthead.TmpDirBrokenCount = dummyTempDirBrokenCount;
+ requesthead.TmpDirCount = TmpDirNum;
+ requesthead.TmpDirBrokenCount = DRMGlobalInstance->LocalHostStat->FailedTmpDirNum;
requesthead.Reserved = 0;
appendSMBVar(&tosend, requesthead);
@@ -146,8 +143,8 @@ int sendIMAlive(int *errorcode,
res = registerAsyncConnectionFileDesc(NULL,
DRMGlobalInstance->SendToStandby?
- standby_addr_host:
- master_addr_host,
+ standby_addr_host:
+ master_addr_host,
rm_master_port,
ASYNCCOMM_READBYTES | ASYNCCOMM_WRITEBYTES,
&AsyncCommBufferHandlersMessage,
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7df00d58/src/backend/resourcemanager/include/dynrm.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/dynrm.h b/src/backend/resourcemanager/include/dynrm.h
index 92ba8b8..4c5879e 100644
--- a/src/backend/resourcemanager/include/dynrm.h
+++ b/src/backend/resourcemanager/include/dynrm.h
@@ -120,6 +120,7 @@ bool handleRMRequestQuotaControl(void **arg);
int refreshLocalHostInstance(void);
void checkLocalPostmasterStatus(void);
+void checkTmpDirStatus(void);
/*-----------------------------------------------------------------------------
* Dynamic resource manager overall APIs
*----------------------------------------------------------------------------*/
@@ -218,6 +219,7 @@ struct DynRMGlobalData{
uint64_t LocalHostLastUpdateTime;
uint64_t HeartBeatLastSentTime;
+ uint64_t TmpDirLastCheckTime;
int32_t SegmentMemoryMB;
double SegmentCore;
/*------------------------------------------------------------------------*/
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7df00d58/src/backend/resourcemanager/include/resourcepool.h
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/include/resourcepool.h b/src/backend/resourcemanager/include/resourcepool.h
index f9b5081..539735d 100644
--- a/src/backend/resourcemanager/include/resourcepool.h
+++ b/src/backend/resourcemanager/include/resourcepool.h
@@ -67,6 +67,8 @@ struct SegInfoData {
uint32_t GRMRackNameOffset;
uint32_t GRMRackNameLen;
uint32_t HostAddrCount;
+ uint32_t FailedTmpDirOffset;
+ uint32_t FailedTmpDirLen;
uint8_t master;
uint8_t standby;
uint8_t alive;
@@ -95,6 +97,12 @@ typedef struct SegInfoData SegInfoData;
((char *)(seginfo) + ((seginfo)->GRMHostNameOffset))
/*
+ * Extract failed temporary string from SegInfo instance.
+ */
+#define GET_SEGINFO_FAILEDTMPDIR(seginfo) \
+ ((char *)(seginfo) + ((seginfo)->FailedTmpDirOffset))
+
+/*
* Macros for getting segment address content from SegInfo instance.
*/
#define IS_SEGINFO_ADDR_STR(attr) \
@@ -135,17 +143,14 @@ void generateSegInfoReport(SegInfo seginfo, SelfMaintainBuffer buff);
struct SegStatData {
int32_t ID; /* Internal ID. */
+ uint16_t FailedTmpDirNum; /* Failed temporary directory number */
uint8_t FTSAvailable; /* If it is available now. */
uint8_t GRMAvailable; /* If it is global resource available.*/
- uint8_t Reserved[2];
uint32_t FTSTotalMemoryMB; /* FTS reports memory capacity. */
uint32_t FTSTotalCore; /* FTS reports core capacity. */
uint32_t GRMTotalMemoryMB; /* GRM reports memory capacity. */
uint32_t GRMTotalCore; /* GRM reports core capacity. */
-
- /* 64-bit aligned. */
-
SegInfoData Info; /* 64-bit aligned. */
};
@@ -628,6 +633,8 @@ int getOrderedResourceAllocTreeIndexByRatio(uint32_t ratio, BBST *tree);
void setAllSegResourceGRMUnavailable(void);
+int getAllSegResourceFTSAvailableNumber(void);
+
struct RB_GRMContainerStatData
{
int64_t ContainerID;
@@ -657,12 +664,20 @@ void checkSlavesFile(void);
void cleanup_segment_config(void);
/* update a segment's status in gp_segment_configuration table */
void update_segment_status(int32_t id, char status);
+/* update a segment's status and failed temporary directory
+ * in gp_segment_configuration table
+ */
+void update_segment_failed_tmpdir
+(int32_t id, char status, int32_t failedNum, char* failedTmpDir);
/* Add a new entry into gp_segment_configuration table*/
void add_segment_config_row(int32_t id,
char *hostname,
char *address,
uint32_t port,
- char role);
+ char role,
+ char status,
+ uint32_t failed_tmpdir_num,
+ char* failed_tmpdir);
/*
* In resource pool, segment's id starts from 0, however in gp_segment_configuration table,
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7df00d58/src/backend/resourcemanager/requesthandler.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/requesthandler.c b/src/backend/resourcemanager/requesthandler.c
index 2da4e4f..6f5eba2 100644
--- a/src/backend/resourcemanager/requesthandler.c
+++ b/src/backend/resourcemanager/requesthandler.c
@@ -592,7 +592,8 @@ bool handleRMSEGRequestIMAlive(void **arg)
SelfMaintainBufferData machinereport;
initializeSelfMaintainBuffer(&machinereport,PCONTEXT);
SegStat segstat = (SegStat)(SMBUFF_CONTENT(&(conntrack->MessageBuff)) +
- sizeof(RPCRequestHeadIMAliveData));
+ sizeof(RPCRequestHeadIMAliveData));
+
generateSegStatReport(segstat, &machinereport);
elog(RMLOG, "Resource manager received segment machine information, %s",
@@ -702,6 +703,25 @@ bool handleRMSEGRequestIMAlive(void **arg)
newseginfoptr = SMBUFF_HEAD(SegInfo, &(newseginfo));
newseginfoptr->HostNameLen = strlen(fts_client_host->h_name);
+ appendSelfMaintainBufferTill64bitAligned(&newseginfo);
+
+ /* fill in failed temporary directory string */
+ if (fts_client_seginfo->FailedTmpDirLen != 0)
+ {
+ newseginfoptr->FailedTmpDirOffset = getSMBContentSize(&newseginfo);
+ newseginfoptr->FailedTmpDirLen = strlen(GET_SEGINFO_FAILEDTMPDIR(fts_client_seginfo));
+ appendSMBStr(&newseginfo, GET_SEGINFO_FAILEDTMPDIR(fts_client_seginfo));
+ elog(RMLOG, "Resource manager received IMAlive message, "
+ "failed temporary directory:%s",
+ GET_SEGINFO_FAILEDTMPDIR(fts_client_seginfo));
+ appendSelfMaintainBufferTill64bitAligned(&newseginfo);
+ }
+ else
+ {
+ newseginfoptr->FailedTmpDirOffset = 0;
+ newseginfoptr->FailedTmpDirLen = 0;
+ }
+
newseginfoptr->Size = getSMBContentSize(&newseginfo);
/* reported by segment, set GRM host/rack as NULL */
newseginfoptr->GRMHostNameLen = 0;
@@ -729,7 +749,27 @@ bool handleRMSEGRequestIMAlive(void **arg)
newsegstat->ID = SEGSTAT_ID_INVALID;
newsegstat->GRMAvailable = RESOURCE_SEG_STATUS_UNSET;
- newsegstat->FTSAvailable = RESOURCE_SEG_STATUS_AVAILABLE;
+
+ RPCRequestHeadIMAlive header = SMBUFF_HEAD(RPCRequestHeadIMAlive,
+ &(conntrack->MessageBuff));
+ newsegstat->FailedTmpDirNum = header->TmpDirBrokenCount;
+
+ /*
+ * Check if the there is any failed temporary directory on this segment.
+ * if has, master considers this segment as down, even it has heart-beat report.
+ */
+ if (newsegstat->FailedTmpDirNum == 0)
+ {
+ newsegstat->FTSAvailable = RESOURCE_SEG_STATUS_AVAILABLE;
+ }
+ else
+ {
+ elog(RMLOG, "Resource manager finds there is %d failed temporary directories "
+ "on this segment, "
+ "so mark this segment unavailable.",
+ newsegstat->FailedTmpDirNum);
+ newsegstat->FTSAvailable = RESOURCE_SEG_STATUS_UNAVAILABLE;
+ }
bool capstatchanged = false;
if ( addHAWQSegWithSegStat(newsegstat, &capstatchanged) != FUNC_RETURN_OK )
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7df00d58/src/backend/resourcemanager/requesthandler_RMSEG.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/requesthandler_RMSEG.c b/src/backend/resourcemanager/requesthandler_RMSEG.c
index c96913d..66f929a 100644
--- a/src/backend/resourcemanager/requesthandler_RMSEG.c
+++ b/src/backend/resourcemanager/requesthandler_RMSEG.c
@@ -52,6 +52,9 @@ char *buildCGroupNameString(int64 masterStartTime, uint32 connId);
*/
int refreshLocalHostInstance(void)
{
+ SimpString failedTmpDirStr;
+ initSimpleString(&failedTmpDirStr, PCONTEXT);
+
/* Get local host name. */
SimpString hostname;
initSimpleString(&hostname, PCONTEXT);
@@ -68,6 +71,33 @@ int refreshLocalHostInstance(void)
addr->Address + 4);
DQUEUE_LOOP_END
+ /* Get a list of failed temporary directory */
+ List* failedTmpDir = getFailedTmpDirList();
+ uint16_t failedTmpDirNum = list_length(failedTmpDir);
+ if (failedTmpDirNum > 0)
+ {
+ SelfMaintainBufferData buf;
+ initializeSelfMaintainBuffer(&buf, PCONTEXT);
+ uint16_t idx = 0;
+ ListCell *lc = NULL;
+ foreach(lc, failedTmpDir)
+ {
+ elog(LOG, "Get a failed temporary directory list for IMAlive message: %s",
+ (char *)lfirst(lc));
+ appendSelfMaintainBuffer(&buf, (char *)lfirst(lc), strlen((char*)lfirst(lc)));
+ if (idx != failedTmpDirNum -1)
+ {
+ appendSelfMaintainBuffer(&buf, ",", 1);
+ }
+ idx++;
+ }
+ static char zeropad = '\0';
+ appendSMBVar(&buf, zeropad);
+ setSimpleStringNoLen(&failedTmpDirStr, buf.Buffer);
+ elog(LOG, "Segment resource manager build failed tmp list string:%s", failedTmpDirStr.Str);
+ destroySelfMaintainBuffer(&buf);
+ }
+
bool shouldupdate = false;
if ( DRMGlobalInstance->LocalHostStat == NULL )
{
@@ -115,6 +145,27 @@ int refreshLocalHostInstance(void)
}
DQUEUE_LOOP_END
}
+
+ /* Check if the failed temporary directory are changed. */
+ if( !shouldupdate &&
+ DRMGlobalInstance->LocalHostStat->FailedTmpDirNum != failedTmpDirNum)
+ {
+ elog(LOG, "Segment resource manager changes number of failed "
+ "temporary from %d to %d.",
+ DRMGlobalInstance->LocalHostStat->FailedTmpDirNum,
+ failedTmpDirNum);
+ shouldupdate = true;
+ }
+
+ if ( !shouldupdate && failedTmpDirNum != 0 )
+ {
+ if (strcmp(GET_SEGINFO_FAILEDTMPDIR(info), failedTmpDirStr.Str) != 0)
+ {
+ elog(LOG, "Segment resource manager finds failed temporary directory change "
+ "from %s to %s", GET_SEGINFO_FAILEDTMPDIR(info), failedTmpDirStr.Str);
+ shouldupdate = true;
+ }
+ }
}
if ( shouldupdate )
@@ -134,8 +185,7 @@ int refreshLocalHostInstance(void)
RMSEG_INBUILDHOST->GRMAvailable = RESOURCE_SEG_STATUS_UNSET;
RMSEG_INBUILDHOST->FTSAvailable = RESOURCE_SEG_STATUS_AVAILABLE;
RMSEG_INBUILDHOST->ID = SEGSTAT_ID_INVALID;
- RMSEG_INBUILDHOST->Reserved[0] = 0;
- RMSEG_INBUILDHOST->Reserved[1] = 0;
+ RMSEG_INBUILDHOST->FailedTmpDirNum = failedTmpDirNum;
RMSEG_INBUILDHOST->Info.master = 0; /* I'm a segment. */
RMSEG_INBUILDHOST->Info.standby = 0; /* I'm a segment. */
@@ -182,6 +232,23 @@ int refreshLocalHostInstance(void)
RMSEG_INBUILDHOST->Info.GRMRackNameLen = 0;
RMSEG_INBUILDHOST->Info.GRMRackNameOffset = 0;
+ /* add failed temporary directory */
+ if (failedTmpDirNum == 0)
+ {
+ RMSEG_INBUILDHOST->Info.FailedTmpDirOffset = 0;
+ RMSEG_INBUILDHOST->Info.FailedTmpDirLen = 0;
+ }
+ else
+ {
+ RMSEG_INBUILDHOST->Info.FailedTmpDirOffset = localsegstat.Cursor + 1 -
+ offsetof(SegStatData, Info);
+ appendSelfMaintainBuffer(&localsegstat, failedTmpDirStr.Str, failedTmpDirStr.Len+1);
+ appendSelfMaintainBufferTill64bitAligned(&localsegstat);
+ RMSEG_INBUILDHOST->Info.FailedTmpDirLen = failedTmpDirStr.Len;
+ elog(LOG, "Segment resource manager builds tmp dir:%s",
+ GET_SEGINFO_FAILEDTMPDIR(&RMSEG_INBUILDHOST->Info));
+ }
+
/* get total size of this machine id. */
RMSEG_INBUILDHOST->Info.Size = localsegstat.Cursor + 1 - offsetof(SegStatData, Info);
@@ -214,6 +281,8 @@ int refreshLocalHostInstance(void)
DQUEUE_LOOP_END
removeAllDQueueNodes(&addresses);
cleanDQueue(&addresses);
+ destroyTmpDirList(failedTmpDir);
+ freeSimpleStringContent(&failedTmpDirStr);
return FUNC_RETURN_OK;
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7df00d58/src/backend/resourcemanager/resourcemanager.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcemanager.c b/src/backend/resourcemanager/resourcemanager.c
index de6327d..5aaf64a 100644
--- a/src/backend/resourcemanager/resourcemanager.c
+++ b/src/backend/resourcemanager/resourcemanager.c
@@ -246,8 +246,8 @@ int ResManagerMain(int argc, char *argv[])
elog(DEBUG5, "HAWQ RM :: Passed initializing core data structure.");
- /**************************************************************************/
- /* STEP 4. INIT for making RM process access catalog by CAQL etc. */
+ /**************************************************************************/
+ /* STEP 4. INIT for making RM process access catalog by CAQL etc. */
/**************************************************************************/
/* BLOCK signal behavior. Only another specific thread has the capability to
* process the signal. */
@@ -263,45 +263,42 @@ int ResManagerMain(int argc, char *argv[])
pqsignal(SIGTTIN, SIG_IGN);
pqsignal(SIGTTOU, SIG_IGN);
- /* Only master side needs the access to catalog. */
- if ( DRMGlobalInstance->Role == START_RM_ROLE_MASTER ) {
+ CurrentResourceOwner = ResourceOwnerCreate(NULL, "Resource Manager");
- CurrentResourceOwner = ResourceOwnerCreate(NULL, "Resource Manager");
+ BaseInit();
+ InitProcess();
+ InitBufferPoolBackend();
+ InitXLOGAccess();
- BaseInit();
- InitProcess();
- InitBufferPoolBackend();
- InitXLOGAccess();
+ SetProcessingMode(NormalProcessing);
- SetProcessingMode(NormalProcessing);
+ MyDatabaseId = TemplateDbOid;
+ MyDatabaseTableSpace = DEFAULTTABLESPACE_OID;
+ if (!FindMyDatabase(probeDatabase, &MyDatabaseId, &MyDatabaseTableSpace))
+ ereport(FATAL, (errcode(ERRCODE_UNDEFINED_DATABASE),
+ errmsg("database 'postgres' does not exist")));
- MyDatabaseId = TemplateDbOid;
- MyDatabaseTableSpace = DEFAULTTABLESPACE_OID;
- if (!FindMyDatabase(probeDatabase, &MyDatabaseId, &MyDatabaseTableSpace))
- ereport(FATAL, (errcode(ERRCODE_UNDEFINED_DATABASE),
- errmsg("database 'postgres' does not exist")));
+ char *fullpath = GetDatabasePath(MyDatabaseId, MyDatabaseTableSpace);
- char *fullpath = GetDatabasePath(MyDatabaseId, MyDatabaseTableSpace);
+ SetDatabasePath(fullpath);
- SetDatabasePath(fullpath);
+ InitProcessPhase2();
- InitProcessPhase2();
+ MyBackendId = InvalidBackendId;
- MyBackendId = InvalidBackendId;
+ SharedInvalBackendInit(false);
- SharedInvalBackendInit(false);
+ if (MyBackendId > MaxBackends || MyBackendId <= 0)
+ elog(FATAL, "bad backend id: %d", MyBackendId);
- if (MyBackendId > MaxBackends || MyBackendId <= 0)
- elog(FATAL, "bad backend id: %d", MyBackendId);
+ InitBufferPoolBackend();
+ RelationCacheInitialize();
+ InitCatalogCache();
+ RelationCacheInitializePhase2();
- InitBufferPoolBackend();
- RelationCacheInitialize();
- InitCatalogCache();
- RelationCacheInitializePhase2();
- }
- /* END: INIT for making RM process access catalog by caql etc. */
- /**************************************************************************/
- PG_SETMASK(&UnBlockSig);
+ /* END: INIT for making RM process access catalog by caql etc. */
+ /**************************************************************************/
+ PG_SETMASK(&UnBlockSig);
/* Save process fork mode. */
DRMGlobalInstance->ThisPID = getpid();
@@ -486,7 +483,10 @@ int ResManagerMainServer2ndPhase(void)
DRMGlobalInstance->SocketLocalHostName.Str,
DRMGlobalInstance->SocketLocalHostName.Str,
PostPortNumber,
- SEGMENT_ROLE_MASTER_CONFIG);
+ SEGMENT_ROLE_MASTER_CONFIG,
+ SEGMENT_STATUS_UP,
+ 0,
+ "");
/* Load queue and user definition as no DDL now. */
res = loadAllQueueAndUser();
@@ -813,6 +813,7 @@ int initializeDRMInstance(MCTYPE context)
DRMGlobalInstance->LocalHostLastUpdateTime = 0;
DRMGlobalInstance->HeartBeatLastSentTime = 0;
+ DRMGlobalInstance->TmpDirLastCheckTime = 0;
DRMGlobalInstance->LocalHostStat = NULL;
initializeDQueue(&(DRMGlobalInstance->LocalHostTempDirectoriesForQD), context);
@@ -2765,12 +2766,11 @@ int loadHostInformationIntoResourcePool(void)
segstat->FTSTotalCore = DRMGlobalInstance->SegmentCore;
segstat->GRMTotalMemoryMB = 0;
segstat->GRMTotalCore = 0;
- segstat->Reserved[0] = 0;
- segstat->Reserved[1] = 0;
+ segstat->FailedTmpDirNum = 0;
memcpy((char *)segstat + offsetof(SegStatData, Info),
- seginfobuff.Buffer,
- seginfobuff.Cursor+1);
+ seginfobuff.Buffer,
+ seginfobuff.Cursor+1);
SelfMaintainBufferData segreport;
initializeSelfMaintainBuffer(&segreport,PCONTEXT);
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7df00d58/src/backend/resourcemanager/resourcemanager_RMSEG.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcemanager_RMSEG.c b/src/backend/resourcemanager/resourcemanager_RMSEG.c
index 966bc7c..6b29a40 100644
--- a/src/backend/resourcemanager/resourcemanager_RMSEG.c
+++ b/src/backend/resourcemanager/resourcemanager_RMSEG.c
@@ -26,6 +26,7 @@
#include "communication/rmcomm_MessageServer.h"
#include "communication/rmcomm_RMSEG2RM.h"
#include "resourceenforcer/resourceenforcer.h"
+#include "cdb/cdbtmpdir.h"
int ResManagerMainSegment2ndPhase(void)
{
@@ -60,7 +61,7 @@ int ResManagerMainSegment2ndPhase(void)
*/
initCGroupThreads();
- InitFileAccess();
+ //InitFileAccess();
/*
* Notify postmaster that HAWQ RM is ready. Ignore the possible problem that
@@ -151,6 +152,7 @@ int initializeSocketServer_RMSEG(void)
}
#define SEGMENT_HEARTBEAT_INTERVAL (3LL * 1000000LL)
#define SEGMENT_HOSTCHECK_INTERVAL (5LL * 1000000LL)
+#define SEGMENT_TMPDIRCHECK_INTERVAL (10 * 60LL * 1000000LL)
int MainHandlerLoop_RMSEG(void)
{
int res = FUNC_RETURN_OK;
@@ -191,6 +193,12 @@ int MainHandlerLoop_RMSEG(void)
checkLocalPostmasterStatus();
}
+ if ( curtime - DRMGlobalInstance->TmpDirLastCheckTime >
+ SEGMENT_TMPDIRCHECK_INTERVAL ) {
+ checkTmpDirStatus();
+ DRMGlobalInstance->TmpDirLastCheckTime = gettime_microsec();
+ }
+
if ( DRMGlobalInstance->SendIMAlive ) {
if (DRMGlobalInstance->LocalHostStat != NULL &&
curtime - DRMGlobalInstance->HeartBeatLastSentTime >
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7df00d58/src/backend/resourcemanager/resourcepool.c
----------------------------------------------------------------------
diff --git a/src/backend/resourcemanager/resourcepool.c b/src/backend/resourcemanager/resourcepool.c
index 8f1d47e..07bd655 100644
--- a/src/backend/resourcemanager/resourcepool.c
+++ b/src/backend/resourcemanager/resourcepool.c
@@ -543,14 +543,106 @@ cleanup:
}
/*
+ * update a segment's status and failed tmp dir
+ * in gp_segment_configuration table.
+ * id : registration order of this segment
+ * status : new status of this segment
+ * failedNum : number of failed temporary directory
+ * failedTmpDir : failed temporary directory list, separated by comma
+ */
+void update_segment_failed_tmpdir
+(int32_t id, char status, int32_t failedNum, char* failedTmpDir)
+{
+ int libpqres = CONNECTION_OK;
+ PGconn *conn = NULL;
+ char conninfo[512];
+ PQExpBuffer sql = NULL;
+ PGresult* result = NULL;
+
+ sprintf(conninfo, "options='-c gp_session_role=UTILITY -c allow_system_table_mods=dml' "
+ "dbname=template1 port=%d connect_timeout=%d", master_addr_port, CONNECT_TIMEOUT);
+ conn = PQconnectdb(conninfo);
+ if ((libpqres = PQstatus(conn)) != CONNECTION_OK)
+ {
+ elog(WARNING, "Fail to connect database when update segment's failed tmpdir "
+ "in segment configuration catalog table, error code: %d, %s",
+ libpqres,
+ PQerrorMessage(conn));
+ PQfinish(conn);
+ return;
+ }
+
+ result = PQexec(conn, "BEGIN");
+ if (!result || PQresultStatus(result) != PGRES_COMMAND_OK)
+ {
+ elog(WARNING, "Fail to run SQL: %s when update segment's failed tmpdir "
+ "in segment configuration catalog table, reason : %s",
+ "BEGIN",
+ PQresultErrorMessage(result));
+ goto cleanup;
+ }
+ PQclear(result);
+
+ sql = createPQExpBuffer();
+ appendPQExpBuffer(sql, "UPDATE gp_segment_configuration SET "
+ "status='%c', failed_tmpdir_num = '%d', failed_tmpdir = '%s' "
+ "WHERE registration_order=%d",
+ status, failedNum, failedTmpDir, id);
+ result = PQexec(conn, sql->data);
+ if (!result || PQresultStatus(result) != PGRES_COMMAND_OK)
+ {
+ elog(WARNING, "Fail to run SQL: %s when update segment's failed tmpdir "
+ "in segment configuration catalog table, reason : %s",
+ sql->data,
+ PQresultErrorMessage(result));
+ goto cleanup;
+ }
+ PQclear(result);
+
+ result = PQexec(conn, "COMMIT");
+ if (!result || PQresultStatus(result) != PGRES_COMMAND_OK)
+ {
+ elog(WARNING, "Fail to run SQL: %s when update segment's failed tmpdir "
+ "in segment configuration catalog table, reason : %s",
+ "COMMIT",
+ PQresultErrorMessage(result));
+ goto cleanup;
+ }
+
+ elog(LOG, "Update a segment's failed tmpdir:"
+ "status to '%c', failed_tmpdir_num to '%d', failed_tmpdir to '%s' "
+ "in segment configuration catalog table,"
+ "registration_order : %d",
+ status, failedNum, failedTmpDir, id);
+
+cleanup:
+ if(sql)
+ destroyPQExpBuffer(sql);
+ if(result)
+ PQclear(result);
+ PQfinish(conn);
+}
+
+/*
* add a row into table gp_segment_configuration using psql
* id : registration order of this segment
* hostname : hostname of this segment
* addreess : IP address of this segment
* port : port of this segment
* role : role of this segment
+ * status : up or down
+ * failed_tmpdir_num : the number of failed temporary directory
+ * failed_tmpdir : failed temporary directory, separated by comma
*/
-void add_segment_config_row(int32_t id, char* hostname, char* address, uint32_t port, char role)
+void add_segment_config_row(int32_t id,
+ char* hostname,
+ char* address,
+ uint32_t port,
+ char role,
+ char status,
+ uint32_t
+ failed_tmpdir_num,
+ char* failed_tmpdir)
{
int libpqres = CONNECTION_OK;
PGconn *conn = NULL;
@@ -583,11 +675,23 @@ void add_segment_config_row(int32_t id, char* hostname, char* address, uint32_t
PQclear(result);
sql = createPQExpBuffer();
- appendPQExpBuffer(sql,
- "INSERT INTO gp_segment_configuration(registration_order,role,status,port,hostname,address) "
- "VALUES "
- "(%d,'%c','%c',%d,'%s','%s')",
- id,role,SEGMENT_STATUS_UP,port,hostname,address);
+ if (role == SEGMENT_ROLE_PRIMARY)
+ {
+ appendPQExpBuffer(sql,
+ "INSERT INTO gp_segment_configuration"
+ "(registration_order,role,status,port,hostname,address,failed_tmpdir_num,failed_tmpdir) "
+ "VALUES "
+ "(%d,'%c','%c',%d,'%s','%s',%d,'%s')",
+ id,role,status,port,hostname,address,failed_tmpdir_num,failed_tmpdir);
+ }
+ else
+ {
+ appendPQExpBuffer(sql,
+ "INSERT INTO gp_segment_configuration(registration_order,role,status,port,hostname,address) "
+ "VALUES "
+ "(%d,'%c','%c',%d,'%s','%s')",
+ id,role,status,port,hostname,address);
+ }
result = PQexec(conn, sql->data);
if (!result || PQresultStatus(result) != PGRES_COMMAND_OK)
{
@@ -610,8 +714,8 @@ void add_segment_config_row(int32_t id, char* hostname, char* address, uint32_t
}
elog(LOG, "Add a new row into segment configuration catalog table,"
- "registration order:%d, role:%c, port:%d, hostname:%s, address:%s",
- id, role, port, hostname, address);
+ "registration order:%d, role:%c, status:%c, port:%d, hostname:%s, address:%s",
+ id, role, status, port, hostname, address);
cleanup:
if(sql)
@@ -678,7 +782,8 @@ int addHAWQSegWithSegStat(SegStat segstat, bool *capstatchanged)
/* CASE 1. It is a new host. */
if ( res != FUNC_RETURN_OK )
{
- *capstatchanged = true;
+ uint8_t reportStatus = segstat->FTSAvailable;
+
/* Create machine information and corresponding resource information. */
segresource = createSegResource(segstat);
@@ -741,10 +846,11 @@ int addHAWQSegWithSegStat(SegStat segstat, bool *capstatchanged)
}
/*
- * Set this node HAWQ available. This is a new host, it is always set
- * HAWQ available because this is from FTS heart-beat of one segment.
+ * This is a new host registration. Normally the status is available,
+ * But if the number of failed temporary directory exceeds guc,
+ * this segment is considered as unavailable.
*/
- setSegResHAWQAvailability(segresource, RESOURCE_SEG_STATUS_AVAILABLE);
+ setSegResHAWQAvailability(segresource, reportStatus);
/* Add this node into the table gp_segment_configuration */
AddressString straddr = NULL;
@@ -753,19 +859,29 @@ int addHAWQSegWithSegStat(SegStat segstat, bool *capstatchanged)
if (Gp_role != GP_ROLE_UTILITY)
{
- add_segment_config_row(segid+REGISTRATION_ORDER_OFFSET,
- hostname,
- straddr->Address,
- segresource->Stat->Info.port,
- SEGMENT_ROLE_PRIMARY);
+ add_segment_config_row (segid+REGISTRATION_ORDER_OFFSET,
+ hostname,
+ straddr->Address,
+ segresource->Stat->Info.port,
+ SEGMENT_ROLE_PRIMARY,
+ segresource->Stat->FTSAvailable == RESOURCE_SEG_STATUS_AVAILABLE ?
+ SEGMENT_STATUS_UP:SEGMENT_STATUS_DOWN,
+ segresource->Stat->FailedTmpDirNum,
+ segresource->Stat->FailedTmpDirNum == 0 ?
+ "":GET_SEGINFO_FAILEDTMPDIR(&segresource->Stat->Info));
+ }
+
+ if (segresource->Stat->FTSAvailable == RESOURCE_SEG_STATUS_AVAILABLE)
+ {
+ /* Add this node into the io bytes workload BBST structure. */
+ addSegResourceCombinedWorkloadIndex(segresource);
+ /* Add this node into the alloc/avail resource ordered indices. */
+ addSegResourceAvailIndex(segresource);
+ addSegResourceAllocIndex(segresource);
+ segcapchanged = true;
+ *capstatchanged = true;
}
- /* Add this node into the io bytes workload BBST structure. */
- addSegResourceCombinedWorkloadIndex(segresource);
- /* Add this node into the alloc/avail resource ordered indices. */
- addSegResourceAvailIndex(segresource);
- addSegResourceAllocIndex(segresource);
- segcapchanged = true;
res = FUNC_RETURN_OK;
}
/*
@@ -777,31 +893,165 @@ int addHAWQSegWithSegStat(SegStat segstat, bool *capstatchanged)
segresource = getSegResource(segid);
Assert(segresource != NULL);
- if ( !IS_SEGSTAT_FTSAVAILABLE(segresource->Stat) )
+ /* Check if temporary directory path is changed */
+ bool tmpDirChanged = false;
+ if (segresource->Stat->FailedTmpDirNum != segstat->FailedTmpDirNum)
+ {
+ tmpDirChanged = true;
+ }
+
+ if (!tmpDirChanged && segresource->Stat->FailedTmpDirNum != 0)
{
- setSegResHAWQAvailability(segresource, RESOURCE_SEG_STATUS_AVAILABLE);
- if (Gp_role != GP_ROLE_UTILITY)
+ if (strcmp(GET_SEGINFO_FAILEDTMPDIR(&segresource->Stat->Info),
+ GET_SEGINFO_FAILEDTMPDIR(&segstat->Info)) != 0)
{
- update_segment_status(segresource->Stat->ID + REGISTRATION_ORDER_OFFSET,
- SEGMENT_STATUS_UP);
+ tmpDirChanged = true;
+ elog(LOG, "Resource manager finds segment %s(%d) 's "
+ "failed temporary directory is changed from "
+ "'%s' to '%s'",
+ GET_SEGRESOURCE_HOSTNAME(segresource),
+ segid,
+ GET_SEGINFO_FAILEDTMPDIR(&segresource->Stat->Info),
+ GET_SEGINFO_FAILEDTMPDIR(&segstat->Info));
}
+ }
- elog(LOG, "Resource manager sets segment %s(%d) up from down.",
- GET_SEGRESOURCE_HOSTNAME(segresource),
- segid);
+ /*
+ * Either the FTSAvailable or the failed temporary directory
+ * of this segment is changed.
+ */
+ uint8_t oldStatus = segresource->Stat->FTSAvailable;
+ bool statusChanged = oldStatus != segstat->FTSAvailable;
+ if (statusChanged || tmpDirChanged)
+ {
+ if (statusChanged && !tmpDirChanged)
+ {
+ if (Gp_role != GP_ROLE_UTILITY)
+ {
+ update_segment_status(segresource->Stat->ID + REGISTRATION_ORDER_OFFSET,
+ segstat->FTSAvailable == RESOURCE_SEG_STATUS_AVAILABLE ?
+ SEGMENT_STATUS_UP:SEGMENT_STATUS_DOWN);
+ }
- /* The segment is up again, its capacity should be considered again. */
- *capstatchanged = true;
+ /*
+ * Segment is set from up to down, return resource.
+ */
+ if (oldStatus == RESOURCE_SEG_STATUS_AVAILABLE)
+ {
+ /* The segment is up again, its capacity should be considered again. */
+ *capstatchanged = true;
+ returnAllGRMResourceFromSegment(segresource);
+ }
+
+ elog(LOG, "Master resource manager sets segment %s(%d)'s status "
+ "to %c",
+ GET_SEGRESOURCE_HOSTNAME(segresource),
+ segid,
+ segstat->FTSAvailable == RESOURCE_SEG_STATUS_AVAILABLE ?
+ SEGMENT_STATUS_UP:SEGMENT_STATUS_DOWN);
+ }
+ else
+ {
+ /*
+ * Failed temporary directory is changed,
+ * if the length of new failed temporary directory exceeds the old one,
+ * we need to repalloc SegInfoData
+ */
+ elog(RMLOG, "Master resource manager is going to set segment %s(%d)'s "
+ "failed temporary directory from '%s' to '%s'",
+ GET_SEGRESOURCE_HOSTNAME(segresource),
+ segid,
+ segresource->Stat->FailedTmpDirNum == 0 ?
+ "" : GET_SEGINFO_FAILEDTMPDIR(&segresource->Stat->Info),
+ segstat->FailedTmpDirNum == 0 ?
+ "" : GET_SEGINFO_FAILEDTMPDIR(&segstat->Info));
+
+ int old = segresource->Stat->Info.FailedTmpDirLen == 0 ?
+ 0 :__SIZE_ALIGN64(segresource->Stat->Info.FailedTmpDirLen+1);
+ int new = segstat->Info.FailedTmpDirLen == 0 ?
+ 0 : __SIZE_ALIGN64(segstat->Info.FailedTmpDirLen+1);
+ if (new > old &&
+ segresource->Stat->Info.Size -
+ (segresource->Stat->Info.HostNameOffset + __SIZE_ALIGN64(segresource->Stat->Info.HostNameLen+1))
+ < new)
+ {
+ SegStat newSegStat = rm_repalloc(PCONTEXT,
+ segresource->Stat,
+ offsetof(SegStatData, Info) +
+ segresource->Stat->Info.Size + (new - old));
+ segresource->Stat = newSegStat;
+ memset((char*)&segresource->Stat->Info + segresource->Stat->Info.Size, 0, (new - old));
+ segresource->Stat->Info.Size += (new - old);
+ }
+
+ if (segstat->FailedTmpDirNum != 0)
+ {
+ segresource->Stat->Info.FailedTmpDirOffset = segresource->Stat->Info.HostNameOffset +
+ __SIZE_ALIGN64(segresource->Stat->Info.HostNameLen+1);
+ memcpy((char *)&segresource->Stat->Info + segresource->Stat->Info.FailedTmpDirOffset,
+ GET_SEGINFO_FAILEDTMPDIR(&segstat->Info),
+ strlen(GET_SEGINFO_FAILEDTMPDIR(&segstat->Info)));
+ memset((char *)&segresource->Stat->Info +
+ segresource->Stat->Info.FailedTmpDirOffset +
+ segstat->Info.FailedTmpDirLen,
+ 0,
+ segresource->Stat->Info.Size -
+ segresource->Stat->Info.FailedTmpDirOffset -
+ segstat->Info.FailedTmpDirLen);
+ }
+ else
+ {
+ memset((char *)&segresource->Stat->Info + segresource->Stat->Info.FailedTmpDirOffset,
+ 0,
+ segresource->Stat->Info.Size - segresource->Stat->Info.FailedTmpDirOffset);
+ segresource->Stat->Info.FailedTmpDirOffset = 0;
+ }
+ segresource->Stat->Info.FailedTmpDirLen = segstat->Info.FailedTmpDirLen;
+ segresource->Stat->FailedTmpDirNum = segstat->FailedTmpDirNum;
+
+ setSegResHAWQAvailability(segresource, segstat->FTSAvailable);
+ if (Gp_role != GP_ROLE_UTILITY)
+ {
+ update_segment_failed_tmpdir(segresource->Stat->ID + REGISTRATION_ORDER_OFFSET,
+ segresource->Stat->FTSAvailable == RESOURCE_SEG_STATUS_AVAILABLE ?
+ SEGMENT_STATUS_UP:SEGMENT_STATUS_DOWN,
+ segresource->Stat->FailedTmpDirNum,
+ segresource->Stat->FailedTmpDirNum == 0 ?
+ "" : GET_SEGINFO_FAILEDTMPDIR(&segresource->Stat->Info));
+ }
+
+ if (statusChanged)
+ {
+ *capstatchanged = true;
+ /*
+ * Segment is set from up to down, return resource.
+ */
+ if (oldStatus == RESOURCE_SEG_STATUS_AVAILABLE)
+ {
+ returnAllGRMResourceFromSegment(segresource);
+ }
+ }
+
+ elog(LOG, "Master resource manager sets segment %s(%d)'s "
+ "failed temporary directory to '%s', status:%c",
+ GET_SEGRESOURCE_HOSTNAME(segresource),
+ segid,
+ segresource->Stat->FailedTmpDirNum == 0 ?
+ "" : GET_SEGINFO_FAILEDTMPDIR(&segresource->Stat->Info),
+ segresource->Stat->FTSAvailable == RESOURCE_SEG_STATUS_AVAILABLE ?
+ SEGMENT_STATUS_UP : SEGMENT_STATUS_DOWN);
+ }
}
/* The machine should be up. Update port number. */
segresource->Stat->Info.port = segstat->Info.port;
/* Update node capacity. */
- if (((segstat->FTSTotalCore > 0) &&
+ if ( segresource->Stat->FTSAvailable == RESOURCE_SEG_STATUS_AVAILABLE &&
+ (((segstat->FTSTotalCore > 0) &&
segresource->Stat->FTSTotalCore != segstat->FTSTotalCore) ||
((segstat->FTSTotalMemoryMB > 0) &&
- segresource->Stat->FTSTotalMemoryMB != segstat->FTSTotalMemoryMB))
+ segresource->Stat->FTSTotalMemoryMB != segstat->FTSTotalMemoryMB)))
{
uint32_t oldftsmem = segresource->Stat->FTSTotalMemoryMB;
@@ -831,8 +1081,7 @@ int addHAWQSegWithSegStat(SegStat segstat, bool *capstatchanged)
segcapchanged =
oldftsmem != segresource->Stat->FTSTotalMemoryMB ||
oldftscore != segresource->Stat->FTSTotalCore;
-
- *capstatchanged = segcapchanged;
+ *capstatchanged = *capstatchanged ? true:segcapchanged;
}
/* update the status of this node */
@@ -1061,6 +1310,25 @@ void setAllSegResourceGRMUnavailable(void)
freePAIRRefList(&(PRESPOOL->Segments), &allsegres);
}
+int getAllSegResourceFTSAvailableNumber(void)
+{
+ int cnt = 0;
+ List *allsegres = NULL;
+ ListCell *cell = NULL;
+ getAllPAIRRefIntoList(&(PRESPOOL->Segments), &allsegres);
+
+ foreach(cell, allsegres)
+ {
+ SegResource segres = (SegResource)(((PAIR)lfirst(cell))->Value);
+ if (segres->Stat->FTSAvailable == RESOURCE_SEG_STATUS_AVAILABLE)
+ {
+ cnt++;
+ }
+ }
+ freePAIRRefList(&(PRESPOOL->Segments), &allsegres);
+ return cnt;
+}
+
/*
* Check index to get host id based on host name string.
*/
@@ -1116,9 +1384,9 @@ SegResource createSegResource(SegStat segstat)
res->NVSeg = 0;
res->Stat = segstat;
res->LastUpdateTime = gettime_microsec();
+ res->RUAlivePending = false;
res->Stat->FTSAvailable = RESOURCE_SEG_STATUS_UNSET;
res->Stat->GRMAvailable = RESOURCE_SEG_STATUS_UNSET;
- res->RUAlivePending = false;
for ( int i = 0 ; i < RESOURCE_QUEUE_RATIO_SIZE ; ++i )
{
@@ -1159,7 +1427,8 @@ int setSegResHAWQAvailability( SegResource segres, uint8_t newstatus)
return res;
}
- if ( newstatus == RESOURCE_SEG_STATUS_UNAVAILABLE )
+ if ( res == RESOURCE_SEG_STATUS_AVAILABLE &&
+ newstatus == RESOURCE_SEG_STATUS_UNAVAILABLE )
{
minusResourceBundleData(&(PRESPOOL->FTSTotal),
segres->Stat->FTSTotalMemoryMB,
@@ -1174,7 +1443,7 @@ int setSegResHAWQAvailability( SegResource segres, uint8_t newstatus)
Assert(PRESPOOL->AvailNodeCount >= 0);
setSegResRUAlivePending(segres, false);
}
- else
+ else if (newstatus == RESOURCE_SEG_STATUS_AVAILABLE)
{
addResourceBundleData(&(PRESPOOL->FTSTotal),
segres->Stat->FTSTotalMemoryMB,
@@ -1189,6 +1458,11 @@ int setSegResHAWQAvailability( SegResource segres, uint8_t newstatus)
PRESPOOL->AvailNodeCount++;
}
}
+ else
+ {
+ /* Unset to unavailable, just return */
+ return res;
+ }
for ( int i = 0 ; i < PQUEMGR->RatioCount ; ++i )
{
@@ -1281,6 +1555,16 @@ void generateSegInfoReport(SegInfo seginfo, SelfMaintainBuffer buff)
}
generateSegInfoAddrStr(seginfo, i, buff);
}
+
+ appendSelfMaintainBuffer(buff, ".", sizeof(".") - 1);
+ if (seginfo->FailedTmpDirLen != 0)
+ {
+ appendSelfMaintainBuffer(buff, "Failed Tmp Dir:", sizeof("Failed Tmp Dir:")-1);
+ appendSelfMaintainBuffer(buff,
+ GET_SEGINFO_FAILEDTMPDIR(seginfo),
+ seginfo->FailedTmpDirLen);
+ }
+
appendSMBVar(buff, zeropad);
}
@@ -3624,12 +3908,13 @@ void validateResourcePoolStatus(bool refquemgr)
Assert( availtree != NULL );
traverseBBSTMidOrder(availtree, &line);
- if ( line.NodeCount != PRESPOOL->Segments.NodeCount )
+ int availableCnt = getAllSegResourceFTSAvailableNumber();
+ if ( line.NodeCount != availableCnt )
{
elog(ERROR, "HAWQ RM Validation. The available resource ordered index "
"contains %d nodes, expect %d nodes.",
line.NodeCount,
- PRESPOOL->Segments.NodeCount);
+ availableCnt);
}
SegResource prevres = NULL;
@@ -3678,12 +3963,13 @@ void validateResourcePoolStatus(bool refquemgr)
Assert( alloctree != NULL );
traverseBBSTMidOrder(alloctree, &line);
- if ( line.NodeCount != PRESPOOL->Segments.NodeCount )
+ int availableCnt = getAllSegResourceFTSAvailableNumber();
+ if ( line.NodeCount != availableCnt )
{
elog(ERROR, "HAWQ RM Validation. The allocated resource ordered index "
"contains %d nodes, expect %d nodes.",
line.NodeCount,
- PRESPOOL->Segments.NodeCount);
+ availableCnt);
}
SegResource prevres = NULL;
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7df00d58/src/backend/storage/ipc/ipci.c
----------------------------------------------------------------------
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 90c87dc..4898c73 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -63,6 +63,7 @@
#include "executor/spi.h"
#include "utils/workfile_mgr.h"
#include "cdb/cdbmetadatacache.h"
+#include "cdb/cdbtmpdir.h"
#include "utils/session_state.h"
shmem_startup_hook_type shmem_startup_hook = NULL;
@@ -173,6 +174,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
size = add_size(size, MetadataCache_ShmemSize());
elog(LOG, "Metadata Cache Share Memory Size : %lu", MetadataCache_ShmemSize());
}
+ size = add_size(size, TmpDirInfoArrayShmemSize());
#ifdef FAULT_INJECTOR
size = add_size(size, FaultInjector_ShmemSize());
@@ -293,7 +295,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
{
MetadataCache_ShmemInit();
}
-
+ TmpDirInfoArrayShmemInit();
if (!IsUnderPostmaster)
{
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7df00d58/src/backend/utils/gp/segadmin.c
----------------------------------------------------------------------
diff --git a/src/backend/utils/gp/segadmin.c b/src/backend/utils/gp/segadmin.c
index 6a7d3d5..6b49926 100644
--- a/src/backend/utils/gp/segadmin.c
+++ b/src/backend/utils/gp/segadmin.c
@@ -277,6 +277,8 @@ gp_add_master_standby(PG_FUNCTION_ARGS)
values[Anum_gp_segment_configuration_port - 1] = Int32GetDatum(master->port);
values[Anum_gp_segment_configuration_hostname - 1] = PG_GETARG_DATUM(0);
values[Anum_gp_segment_configuration_address - 1] = PG_GETARG_DATUM(1);
+ nulls[Anum_gp_segment_configuration_failed_tmpdir_num - 1] = true;
+ nulls[Anum_gp_segment_configuration_failed_tmpdir - 1] = true;
tuple = caql_form_tuple(pcqCtx, values, nulls);
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7df00d58/src/backend/utils/init/postinit.c
----------------------------------------------------------------------
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index e4d0752..8d48836 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -441,8 +441,8 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
}
else
{
- getLocalTmpDirFromMasterConfig(gp_session_id);
- elog(LOG, "getLocalTmpDirFromMasterConfig session_id:%d tmpdir:%s", gp_session_id, LocalTempPath);
+ getMasterLocalTmpDirFromShmem(gp_session_id);
+ elog(LOG, "getMasterLocalTmpDirFromShmem session_id:%d tmpdir:%s", gp_session_id, LocalTempPath);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7df00d58/src/include/catalog/gp_segment_config.h
----------------------------------------------------------------------
diff --git a/src/include/catalog/gp_segment_config.h b/src/include/catalog/gp_segment_config.h
index ad489a4..f9f2def 100644
--- a/src/include/catalog/gp_segment_config.h
+++ b/src/include/catalog/gp_segment_config.h
@@ -52,7 +52,9 @@
status "char" ,
port integer ,
hostname text ,
- address text
+ address text ,
+ failed_tmpdir_num integer ,
+ failed_tmpdir text
);
create unique index on gp_segment_configuration(registration_order) with (indexid=6106, indexname=gp_segment_config_registration_order_index);
@@ -64,7 +66,7 @@
WARNING: DO NOT MODIFY THE FOLLOWING SECTION:
Generated by tidycat.pl version 34
- on Thu Feb 5 16:28:45 2015
+ on Sat Jan 2 21:01:04 2016
*/
@@ -93,6 +95,8 @@ CATALOG(gp_segment_configuration,5036) BKI_SHARED_RELATION BKI_WITHOUT_OIDS
int4 port;
text hostname;
text address;
+ int4 failed_tmpdir_num;
+ text failed_tmpdir;
} FormData_gp_segment_configuration;
@@ -108,13 +112,16 @@ typedef FormData_gp_segment_configuration *Form_gp_segment_configuration;
* compiler constants for gp_segment_configuration
* ----------------
*/
-#define Natts_gp_segment_configuration 6
+#define Natts_gp_segment_configuration 8
#define Anum_gp_segment_configuration_registration_order 1
#define Anum_gp_segment_configuration_role 2
#define Anum_gp_segment_configuration_status 3
#define Anum_gp_segment_configuration_port 4
#define Anum_gp_segment_configuration_hostname 5
#define Anum_gp_segment_configuration_address 6
+#define Anum_gp_segment_configuration_failed_tmpdir_num 7
+#define Anum_gp_segment_configuration_failed_tmpdir 8
+
/* TIDYCAT_END_CODEGEN */
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7df00d58/src/include/cdb/cdbtmpdir.h
----------------------------------------------------------------------
diff --git a/src/include/cdb/cdbtmpdir.h b/src/include/cdb/cdbtmpdir.h
index 62d3046..63feb8d 100644
--- a/src/include/cdb/cdbtmpdir.h
+++ b/src/include/cdb/cdbtmpdir.h
@@ -19,8 +19,27 @@
#ifndef CDBTMPDIR_H
#define CDBTMPDIR_H
+#include "c.h"
-void getLocalTmpDirFromMasterConfig(int session_id);
-void getLocalTmpDirFromSegmentConfig(int session_id, int command_id, int qeidx);
+#define MAX_TMP_DIR_LEN 8192
+
+typedef struct TmpDirInfo
+{
+ bool available;
+ char path[MAX_TMP_DIR_LEN];
+} TmpDirInfo;
+
+extern int32_t TmpDirNum;
+
+Size TmpDirInfoArrayShmemSize(void);
+void TmpDirInfoArrayShmemInit(void);
+char* GetTmpDirPathFromArray(int64_t idx);
+bool DestroyTmpDirInfoArray(TmpDirInfo *info);
+bool CheckTmpDirAvailable(char *path);
+void destroyTmpDirList(List *list);
+void checkTmpDirStatus(void);
+
+void getMasterLocalTmpDirFromShmem(int session_id);
+void getSegmentLocalTmpDirFromShmem(int session_id, int command_id, int qeidx);
#endif
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7df00d58/src/include/storage/lwlock.h
----------------------------------------------------------------------
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index 9272fc7..b597651 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -86,7 +86,8 @@ typedef enum LWLockId
AOSegFileLock,
ParquetSegFileLock,
PersistentObjLock,
- MetadataCacheLock,
+ MetadataCacheLock,
+ TmpDirInfoLock,
FileRepShmemLock,
FileRepAckShmemLock,
FileRepAckHashShmemLock,
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7df00d58/tools/bin/gppylib/data/2.0.json
----------------------------------------------------------------------
diff --git a/tools/bin/gppylib/data/2.0.json b/tools/bin/gppylib/data/2.0.json
index af8bede..3fbd349 100644
--- a/tools/bin/gppylib/data/2.0.json
+++ b/tools/bin/gppylib/data/2.0.json
@@ -1,5 +1,5 @@
{
- "__comment" : "Generated by ./tidycat.pl version 34 on Thu Sep 17 15:49:18 2015 CATALOG_VERSION_NO=201507221",
+ "__comment" : "Generated by tidycat.pl version 34 on Sat Jan 2 21:08:04 2016 CATALOG_VERSION_NO=201507221",
"__info" : {
"CATALOG_VERSION_NO" : "201507221"
},
@@ -940,6 +940,8 @@
"UppercaseToastReltypeOid" : "GP_SEGMENT_CONFIGURATION_TOAST_RELTYPE_OID",
"colh" : {
"address" : "text",
+ "failed_tmpdir" : "text",
+ "failed_tmpdir_num" : "int4",
"hostname" : "text",
"port" : "int4",
"registration_order" : "int4",
@@ -977,6 +979,16 @@
"colname" : "address",
"ctype" : "text",
"sqltype" : "text"
+ },
+ {
+ "colname" : "failed_tmpdir_num",
+ "ctype" : "int4",
+ "sqltype" : "integer"
+ },
+ {
+ "colname" : "failed_tmpdir",
+ "ctype" : "text",
+ "sqltype" : "text"
}
],
"filename" : "gp_segment_config.h",
@@ -1015,7 +1027,7 @@
}
],
"relid_comment_tag" : "/* relation id: 5036 - gp_segment_configuration */\n",
- "tabdef_text" : "\n CREATE TABLE gp_segment_configuration\n with (camelcase=GpSegmentConfig, shared=true, oid=false, relid=5036, reltype_oid=6442, toast_oid=2900, toast_index=2901, toast_reltype=2906, content=MASTER_ONLY)\n (\n registration_order integer ,\n role \"char\" ,\n status \"char\" ,\n port integer ,\n hostname text ,\n address text\n )",
+ "tabdef_text" : "\n CREATE TABLE gp_segment_configuration\n with (camelcase=GpSegmentConfig, shared=true, oid=false, relid=5036, reltype_oid=6442, toast_oid=2900, toast_index=2901, toast_reltype=2906, content=MASTER_ONLY)\n (\n registration_order integer ,\n role \"char\" ,\n status \"char\" ,\n port integer ,\n hostname text ,\n address text ,\n failed_tmpdir_num integer ,\n failed_tmpdir text\n )",
"with" : {
"bootstrap" : 0,
"camelcase" : "GpSegmentConfig",